Started recording the acks that are stored as full messages in memory

This commit is contained in:
Rob Harrop 2010-10-11 15:54:48 +01:00
parent 9c60db6b5e
commit 9485410ee9
1 changed files with 36 additions and 25 deletions

View File

@ -220,6 +220,7 @@
q4,
next_seq_id,
pending_ack,
pending_ack_index,
index_state,
msg_store_clients,
on_sync,
@ -305,6 +306,7 @@
q4 :: queue(),
next_seq_id :: seq_id(),
pending_ack :: dict:dictionary(),
pending_ack_index :: gb_trees:gb_tree(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
@ -407,6 +409,7 @@ init(QueueName, IsDurable, Recover) ->
q4 = queue:new(),
next_seq_id = NextSeqId,
pending_ack = dict:new(),
pending_ack_index = gb_trees:empty(),
index_state = IndexState1,
msg_store_clients = {{PersistentClient, PRef},
{TransientClient, TRef}},
@ -509,27 +512,24 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
out_counter = OutCount,
in_counter = InCount,
persistent_count = PCount,
pending_ack = PA,
durable = IsDurable }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
PA1 = record_pending_ack(m(MsgStatus1), PA),
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
{SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1,
{SeqId, a(State2 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
pending_ack = PA1 })}.
persistent_count = PCount1 })}.
fetch(AckRequired, State = #vqstate { q4 = Q4,
ram_msg_count = RamMsgCount,
out_counter = OutCount,
index_state = IndexState,
len = Len,
persistent_count = PCount,
pending_ack = PA }) ->
persistent_count = PCount }) ->
case queue:out(Q4) of
{empty, _Q4} ->
case fetch_from_q3_to_q4(State) of
@ -560,24 +560,24 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
end,
%% 3. If an ack is required, add something sensible to PA
{AckTag, PA1} = case AckRequired of
true -> PA2 = record_pending_ack(
MsgStatus #msg_status {
is_delivered = true }, PA),
{SeqId, PA2};
false -> {blank_ack, PA}
{AckTag, State1} = case AckRequired of
true -> StateN = record_pending_ack(
MsgStatus #msg_status {
is_delivered = true },
State),
{SeqId, StateN};
false -> {blank_ack, State}
end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
Len1 = Len - 1,
{{Msg, IsDelivered, AckTag, Len1},
a(State #vqstate { q4 = Q4a,
a(State1 #vqstate { q4 = Q4a,
ram_msg_count = RamMsgCount - 1,
out_counter = OutCount + 1,
index_state = IndexState2,
len = Len1,
persistent_count = PCount1,
pending_ack = PA1 })}
persistent_count = PCount1})}
end.
ack(AckTags, State) ->
@ -1090,19 +1090,27 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent,
msg_on_disk = MsgOnDisk } = MsgStatus, PA) ->
AckEntry = case MsgOnDisk of
true -> {IsPersistent, Guid};
false -> MsgStatus
end,
dict:store(SeqId, AckEntry, PA).
msg_on_disk = MsgOnDisk } = MsgStatus,
State = #vqstate { pending_ack = PA,
pending_ack_index = PAI }) ->
{AckEntry, PAI1} =
case MsgOnDisk of
true ->
{{IsPersistent, Guid}, PAI};
false ->
{MsgStatus, gb_trees:insert(SeqId, Guid, PAI)}
end,
PA1 = dict:store(SeqId, AckEntry, PA),
State #vqstate { pending_ack = PA1, pending_ack_index = PAI1 }.
%% TODO: On remove, need to prevent any seqids that
remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState }) ->
{SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3,
{[], orddict:new()}, PA),
State1 = State #vqstate { pending_ack = dict:new() },
State1 = State #vqstate { pending_ack = dict:new(),
pending_ack_index = gb_trees:empty() },
case KeepPersistent of
true -> case orddict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of
error -> State1;
@ -1124,11 +1132,14 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
{{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
persistent_count = PCount }} =
lists:foldl(
fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) ->
fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA,
pending_ack_index = PAI }}) ->
{ok, AckEntry} = dict:find(SeqId, PA),
{accumulate_ack(SeqId, AckEntry, Acc),
Fun(AckEntry, State2 #vqstate {
pending_ack = dict:erase(SeqId, PA) })}
pending_ack = dict:erase(SeqId, PA),
pending_ack_index =
gb_trees:delete_any(SeqId, PAI)})}
end, {{[], orddict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = orddict:fold(fun (MsgStore, Guids, ok) ->