Rename target_ram_msg_count to target_ram_item_count

This commit is contained in:
Rob Harrop 2010-11-11 19:33:43 +00:00
parent 68276333ab
commit 7b2a828ea1
2 changed files with 110 additions and 109 deletions

View File

@ -1892,7 +1892,7 @@ test_variable_queue_ack_limiting(VQ0) ->
VQ6 = check_variable_queue_status( VQ6 = check_variable_queue_status(
rabbit_variable_queue:set_ram_duration_target(0, VQ5), rabbit_variable_queue:set_ram_duration_target(0, VQ5),
[{len, Len div 2}, [{len, Len div 2},
{target_ram_msg_count, 0}, {target_ram_item_count, 0},
{ram_msg_count, 0}, {ram_msg_count, 0},
{ram_ack_count, 0}]), {ram_ack_count, 0}]),

View File

@ -158,7 +158,7 @@
%% The conversion from alphas to betas is also chunked, but only to %% The conversion from alphas to betas is also chunked, but only to
%% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at %% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at
%% any one time. This further smooths the effects of changes to the %% any one time. This further smooths the effects of changes to the
%% target_ram_msg_count and ensures the queue remains responsive %% target_ram_item_count and ensures the queue remains responsive
%% even when there is a large amount of IO work to do. The %% even when there is a large amount of IO work to do. The
%% idle_timeout callback is utilised to ensure that conversions are %% idle_timeout callback is utilised to ensure that conversions are
%% done as promptly as possible whilst ensuring the queue remains %% done as promptly as possible whilst ensuring the queue remains
@ -257,7 +257,7 @@
len, len,
persistent_count, persistent_count,
target_ram_msg_count, target_ram_item_count,
ram_msg_count, ram_msg_count,
ram_msg_count_prev, ram_msg_count_prev,
ram_ack_count_prev, ram_ack_count_prev,
@ -331,34 +331,34 @@
funs :: [fun (() -> any())] }). funs :: [fun (() -> any())] }).
-type(state() :: #vqstate { -type(state() :: #vqstate {
q1 :: queue(), q1 :: queue(),
q2 :: bpqueue:bpqueue(), q2 :: bpqueue:bpqueue(),
delta :: delta(), delta :: delta(),
q3 :: bpqueue:bpqueue(), q3 :: bpqueue:bpqueue(),
q4 :: queue(), q4 :: queue(),
next_seq_id :: seq_id(), next_seq_id :: seq_id(),
pending_ack :: dict:dictionary(), pending_ack :: dict:dictionary(),
ram_ack_index :: gb_tree(), ram_ack_index :: gb_tree(),
index_state :: any(), index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()}, msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}}, {any(), binary()}},
on_sync :: sync(), on_sync :: sync(),
durable :: boolean(), durable :: boolean(),
len :: non_neg_integer(), len :: non_neg_integer(),
persistent_count :: non_neg_integer(), persistent_count :: non_neg_integer(),
transient_threshold :: non_neg_integer(), transient_threshold :: non_neg_integer(),
target_ram_msg_count :: non_neg_integer() | 'infinity', target_ram_item_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(), ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(),
ram_index_count :: non_neg_integer(), ram_index_count :: non_neg_integer(),
out_counter :: non_neg_integer(), out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(), in_counter :: non_neg_integer(),
ack_out_counter :: non_neg_integer(), ack_out_counter :: non_neg_integer(),
ack_in_counter :: non_neg_integer(), ack_in_counter :: non_neg_integer(),
rates :: rates(), rates :: rates(),
ack_rates :: rates() }). ack_rates :: rates() }).
-include("rabbit_backing_queue_spec.hrl"). -include("rabbit_backing_queue_spec.hrl").
@ -715,18 +715,18 @@ set_ram_duration_target(DurationTarget,
ack_rates = ack_rates =
#rates { avg_egress = AvgAckEgressRate, #rates { avg_egress = AvgAckEgressRate,
avg_ingress = AvgAckIngressRate }, avg_ingress = AvgAckIngressRate },
target_ram_msg_count = TargetRamMsgCount }) -> target_ram_item_count = TargetRamItemCount }) ->
Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate
+ AvgAckIngressRate, + AvgAckIngressRate,
TargetRamMsgCount1 = TargetRamItemCount1 =
case DurationTarget of case DurationTarget of
infinity -> infinity; infinity -> infinity;
_ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
end, end,
State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1 }, State1 = State #vqstate { target_ram_item_count = TargetRamItemCount1 },
a(case TargetRamMsgCount1 == infinity orelse a(case TargetRamItemCount1 == infinity orelse
(TargetRamMsgCount =/= infinity andalso (TargetRamItemCount =/= infinity andalso
TargetRamMsgCount1 >= TargetRamMsgCount) of TargetRamItemCount1 >= TargetRamItemCount) of
true -> State1; true -> State1;
false -> reduce_memory_use(State1) false -> reduce_memory_use(State1)
end). end).
@ -799,39 +799,39 @@ handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len, len = Len,
pending_ack = PA, pending_ack = PA,
ram_ack_index = RAI, ram_ack_index = RAI,
on_sync = #sync { funs = From }, on_sync = #sync { funs = From },
target_ram_msg_count = TargetRamMsgCount, target_ram_item_count = TargetRamItemCount,
ram_msg_count = RamMsgCount, ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount, ram_index_count = RamIndexCount,
next_seq_id = NextSeqId, next_seq_id = NextSeqId,
persistent_count = PersistentCount, persistent_count = PersistentCount,
rates = #rates { rates = #rates {
avg_egress = AvgEgressRate, avg_egress = AvgEgressRate,
avg_ingress = AvgIngressRate }, avg_ingress = AvgIngressRate },
ack_rates = #rates { ack_rates = #rates {
avg_egress = AvgAckEgressRate, avg_egress = AvgAckEgressRate,
avg_ingress = AvgAckIngressRate } }) -> avg_ingress = AvgAckIngressRate } }) ->
[ {q1 , queue:len(Q1)}, [ {q1 , queue:len(Q1)},
{q2 , bpqueue:len(Q2)}, {q2 , bpqueue:len(Q2)},
{delta , Delta}, {delta , Delta},
{q3 , bpqueue:len(Q3)}, {q3 , bpqueue:len(Q3)},
{q4 , queue:len(Q4)}, {q4 , queue:len(Q4)},
{len , Len}, {len , Len},
{pending_acks , dict:size(PA)}, {pending_acks , dict:size(PA)},
{ram_ack_count , gb_trees:size(RAI)}, {ram_ack_count , gb_trees:size(RAI)},
{outstanding_txns , length(From)}, {outstanding_txns , length(From)},
{target_ram_msg_count , TargetRamMsgCount}, {target_ram_item_count , TargetRamItemCount},
{ram_msg_count , RamMsgCount}, {ram_msg_count , RamMsgCount},
{ram_index_count , RamIndexCount}, {ram_index_count , RamIndexCount},
{next_seq_id , NextSeqId}, {next_seq_id , NextSeqId},
{persistent_count , PersistentCount}, {persistent_count , PersistentCount},
{avg_egress_rate , AvgEgressRate}, {avg_egress_rate , AvgEgressRate},
{avg_ingress_rate , AvgIngressRate}, {avg_ingress_rate , AvgIngressRate},
{avg_ack_egress_rate , AvgAckEgressRate}, {avg_ack_egress_rate , AvgAckEgressRate},
{avg_ack_ingress_rate , AvgAckIngressRate}]. {avg_ack_ingress_rate , AvgAckIngressRate}].
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
%% Minor helpers %% Minor helpers
@ -1021,42 +1021,42 @@ init(IsDurable, IndexState, DeltaCount, Terms,
end, end,
Now = now(), Now = now(),
State = #vqstate { State = #vqstate {
q1 = queue:new(), q1 = queue:new(),
q2 = bpqueue:new(), q2 = bpqueue:new(),
delta = Delta, delta = Delta,
q3 = bpqueue:new(), q3 = bpqueue:new(),
q4 = queue:new(), q4 = queue:new(),
next_seq_id = NextSeqId, next_seq_id = NextSeqId,
pending_ack = dict:new(), pending_ack = dict:new(),
ram_ack_index = gb_trees:empty(), ram_ack_index = gb_trees:empty(),
index_state = IndexState1, index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient}, msg_store_clients = {PersistentClient, TransientClient},
on_sync = ?BLANK_SYNC, on_sync = ?BLANK_SYNC,
durable = IsDurable, durable = IsDurable,
transient_threshold = NextSeqId, transient_threshold = NextSeqId,
len = DeltaCount1, len = DeltaCount1,
persistent_count = DeltaCount1, persistent_count = DeltaCount1,
target_ram_msg_count = infinity, target_ram_item_count = infinity,
ram_msg_count = 0, ram_msg_count = 0,
ram_msg_count_prev = 0, ram_msg_count_prev = 0,
ram_ack_count_prev = 0, ram_ack_count_prev = 0,
ram_index_count = 0, ram_index_count = 0,
out_counter = 0, out_counter = 0,
in_counter = 0, in_counter = 0,
ack_out_counter = 0, ack_out_counter = 0,
ack_in_counter = 0, ack_in_counter = 0,
rates = #rates { egress = {Now, 0}, rates = #rates { egress = {Now, 0},
ingress = {Now, DeltaCount1}, ingress = {Now, DeltaCount1},
avg_egress = 0.0, avg_egress = 0.0,
avg_ingress = 0.0, avg_ingress = 0.0,
timestamp = Now }, timestamp = Now },
ack_rates = #rates { egress = {Now, 0}, ack_rates = #rates { egress = {Now, 0},
ingress = {Now, 0}, ingress = {Now, 0},
avg_egress = 0.0, avg_egress = 0.0,
avg_ingress = 0.0, avg_ingress = 0.0,
timestamp = undefined } }, timestamp = undefined } },
a(maybe_deltas_to_betas(State)). a(maybe_deltas_to_betas(State)).
msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
@ -1360,7 +1360,7 @@ find_persistent_count(LensByStore) ->
%% though the conversion function for that is called as necessary. The %% though the conversion function for that is called as necessary. The
%% reason is twofold. Firstly, this is safe because the conversion is %% reason is twofold. Firstly, this is safe because the conversion is
%% only ever necessary just after a transition to a %% only ever necessary just after a transition to a
%% target_ram_msg_count of zero or after an incremental alpha->beta %% target_ram_item_count of zero or after an incremental alpha->beta
%% conversion. In the former case the conversion is performed straight %% conversion. In the former case the conversion is performed straight
%% away (i.e. any betas present at the time are converted to deltas), %% away (i.e. any betas present at the time are converted to deltas),
%% and in the latter case the need for a conversion is flagged up %% and in the latter case the need for a conversion is flagged up
@ -1371,23 +1371,23 @@ find_persistent_count(LensByStore) ->
%% perpetually reporting the need for a conversion when no such %% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop. %% conversion is needed. That in turn could cause an infinite loop.
reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun, reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun,
State = #vqstate {target_ram_msg_count = infinity}) -> State = #vqstate {target_ram_item_count = infinity}) ->
{false, State}; {false, State};
reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun,
State = #vqstate { State = #vqstate {
ram_ack_index = RamAckIndex, ram_ack_index = RamAckIndex,
ram_msg_count = RamMsgCount, ram_msg_count = RamMsgCount,
target_ram_msg_count = TargetRamMsgCount, target_ram_item_count = TargetRamItemCount,
rates = #rates { rates = #rates {
avg_ingress = AvgIngress, avg_ingress = AvgIngress,
avg_egress = AvgEgress }, avg_egress = AvgEgress },
ack_rates = #rates { ack_rates = #rates {
avg_ingress = AvgAckIngress, avg_ingress = AvgAckIngress,
avg_egress = AvgAckEgress } }) -> avg_egress = AvgAckEgress } }) ->
{Reduce, State1} = {Reduce, State1} =
case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
TargetRamMsgCount) of TargetRamItemCount) of
0 -> 0 ->
{false, State}; {false, State};
S1 -> S1 ->
@ -1415,7 +1415,7 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun,
{true, StateOut} {true, StateOut}
end, end,
case State1 #vqstate.target_ram_msg_count of case State1 #vqstate.target_ram_item_count of
0 -> {Reduce, BetaDeltaFun(State1)}; 0 -> {Reduce, BetaDeltaFun(State1)};
_ -> case chunk_size(State1 #vqstate.ram_index_count, _ -> case chunk_size(State1 #vqstate.ram_index_count,
permitted_ram_index_count(State1)) of permitted_ram_index_count(State1)) of
@ -1612,10 +1612,11 @@ maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) ->
maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
State = #vqstate { State = #vqstate {
ram_msg_count = RamMsgCount, ram_msg_count = RamMsgCount,
target_ram_msg_count = TargetRamMsgCount }) target_ram_item_count = TargetRamItemCount })
when Quota =:= 0 orelse when Quota =:= 0 orelse
TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount -> TargetRamItemCount =:= infinity orelse
TargetRamItemCount >= RamMsgCount ->
{Quota, State}; {Quota, State};
maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
case Generator(Q) of case Generator(Q) of