diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index e4111f82fe..c59b12dd29 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -32,7 +32,7 @@ -module(rabbit_queue_index). -export([init/1, write_published/4, write_delivered/2, write_acks/2, - flush_journal/1, read_segment_entries/2]). + flush_journal/1, read_segment_entries/2, next_segment_boundary/1]). %%---------------------------------------------------------------------------- %% The queue disk index @@ -242,6 +242,10 @@ read_segment_entries(InitSeqId, State = end, [], RelSeqs), State}. +next_segment_boundary(SeqId) -> + {SegNum, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), + reconstruct_seq_id(SegNum + 1, 0). + %%---------------------------------------------------------------------------- %% Minor Helpers %%---------------------------------------------------------------------------- diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 784cda3967..73c3c3395b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,7 +31,7 @@ -module(rabbit_variable_queue). --export([init/1, in/3]). +-export([init/1, in/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1]). -record(vqstate, { q1, @@ -39,12 +39,16 @@ gamma, q3, q4, - egress_rate, target_ram_msg_count, ram_msg_count, queue, index_state, - next_seq_id + next_seq_id, + out_counter, + egress_rate, + old_egress_rate, + avg_egress_rate, + egress_rate_timestamp }). -include("rabbit.hrl"). @@ -54,35 +58,19 @@ init(QueueName) -> #vqstate { q1 = queue:new(), q2 = queue:new(), gamma = 0, q3 = queue:new(), q4 = queue:new(), - egress_rate = 0, target_ram_msg_count = undefined, ram_msg_count = 0, queue = QueueName, index_state = IndexState, - next_seq_id = NextSeqId + next_seq_id = NextSeqId, + out_counter = 0, + egress_rate = 0, + old_egress_rate = 0, + avg_egress_rate = 0, + egress_rate_timestamp = now() }. -maybe_write_msg_to_disk(Bool, Msg = #basic_message { - guid = MsgId, is_persistent = IsPersistent }) - when Bool orelse IsPersistent -> - ok = rabbit_msg_store:write(MsgId, ensure_binary_properties(Msg)), - true; -maybe_write_msg_to_disk(_Bool, _Msg) -> - false. - -maybe_write_index_to_disk(Bool, IsPersistent, MsgId, SeqId, IsDelivered, - IndexState) when Bool orelse IsPersistent -> - IndexState1 = rabbit_queue_index:write_published( - MsgId, SeqId, IsPersistent, IndexState), - {true, case IsDelivered of - true -> rabbit_queue_index:write_delivered(SeqId, IndexState1); - false -> IndexState1 - end}; -maybe_write_index_to_disk(_Bool, _IsPersistent, _MsgId, _SeqId, _IsDelivered, - IndexState) -> - {false, IndexState}. - -in(Msg = #basic_message {}, IsDelivered, State) -> +in(Msg, IsDelivered, State) -> in(test_keep_msg_in_ram(State), Msg, IsDelivered, State). in(msg_and_index, Msg = #basic_message { guid = MsgId, @@ -110,7 +98,7 @@ in(just_index, Msg = #basic_message { guid = MsgId, {IndexOnDisk, IndexState1} = maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId, IsDelivered, IndexState), - Entry = {index, MsgId, SeqId, IsDelivered, true, IndexOnDisk}, + Entry = {index, MsgId, SeqId, IsPersistent, IsDelivered, true, IndexOnDisk}, State1 = State #vqstate { next_seq_id = SeqId + 1, index_state = IndexState1 }, true = queue:is_empty(Q1), %% ASSERTION @@ -130,6 +118,73 @@ in(neither, Msg = #basic_message { guid = MsgId, index_state = IndexState1, gamma = Gamma + 1 }. +set_queue_ram_duration_target( + DurationTarget, State = #vqstate { avg_egress_rate = EgressRate, + target_ram_msg_count = TargetRamMsgCount + }) -> + TargetRamMsgCount1 = trunc(DurationTarget * EgressRate), %% msgs = sec * msgs/sec + State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1 }, + if TargetRamMsgCount == TargetRamMsgCount1 -> + State1; + TargetRamMsgCount < TargetRamMsgCount1 -> + maybe_start_prefetcher(State1); + true -> + reduce_memory_use(State1) + end. + +remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate, + egress_rate_timestamp = Timestamp, + out_counter = OutCount }) -> + Now = now(), + EgressRate = OutCount / timer:now_diff(Now, Timestamp), + AvgEgressRate = (EgressRate + OldEgressRate) / 2, + State #vqstate { old_egress_rate = OldEgressRate, + egress_rate = EgressRate, + avg_egress_rate = AvgEgressRate, + egress_rate_timestamp = Now, + out_counter = 0 }. + +maybe_start_prefetcher(State) -> + %% TODO + State. + +reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount, + target_ram_msg_count = TargetRamMsgCount }) + when TargetRamMsgCount >= RamMsgCount -> + State; +reduce_memory_use(State = + #vqstate { target_ram_msg_count = TargetRamMsgCount }) -> + State1 = #vqstate { ram_msg_count = RamMsgCount } = + maybe_push_q1_to_betas(State), + State2 = case TargetRamMsgCount >= RamMsgCount of + true -> State1; + false -> maybe_push_q4_to_betas(State) + end, + case TargetRamMsgCount of + 0 -> push_betas_to_gammas(State); + _ -> State2 + end. + +maybe_write_msg_to_disk(Bool, Msg = #basic_message { + guid = MsgId, is_persistent = IsPersistent }) + when Bool orelse IsPersistent -> + ok = rabbit_msg_store:write(MsgId, ensure_binary_properties(Msg)), + true; +maybe_write_msg_to_disk(_Bool, _Msg) -> + false. + +maybe_write_index_to_disk(Bool, IsPersistent, MsgId, SeqId, IsDelivered, + IndexState) when Bool orelse IsPersistent -> + IndexState1 = rabbit_queue_index:write_published( + MsgId, SeqId, IsPersistent, IndexState), + {true, case IsDelivered of + true -> rabbit_queue_index:write_delivered(SeqId, IndexState1); + false -> IndexState1 + end}; +maybe_write_index_to_disk(_Bool, _IsPersistent, _MsgId, _SeqId, _IsDelivered, + IndexState) -> + {false, IndexState}. + test_keep_msg_in_ram(#vqstate { target_ram_msg_count = TargetRamMsgCount, ram_msg_count = RamMsgCount, q1 = Q1 }) -> @@ -156,7 +211,7 @@ store_alpha_entry(Entry, State = #vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, true -> State #vqstate { q4 = queue:in(Entry, Q4) }; false -> - maybe_push_q1_out(State #vqstate { q1 = queue:in(Entry, Q1) }) + maybe_push_q1_to_betas(State #vqstate { q1 = queue:in(Entry, Q1) }) end. store_beta_entry(Entry, State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3 }) -> @@ -165,18 +220,90 @@ store_beta_entry(Entry, State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3 }) -> false -> State #vqstate { q2 = queue:in(Entry, Q2) } end. -maybe_push_q1_out(State = #vqstate { ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount - }) when TargetRamMsgCount > RamMsgCount -> +maybe_push_q1_to_betas(State = + #vqstate { ram_msg_count = RamMsgCount, + target_ram_msg_count = TargetRamMsgCount + }) when TargetRamMsgCount >= RamMsgCount -> State; -maybe_push_q1_out(State = #vqstate { ram_msg_count = RamMsgCount, q1 = Q1 }) -> - {{value, {msg_and_index, Msg = #basic_message { guid = MsgId }, SeqId, - IsDelivered, MsgOnDisk, IndexOnDisk}}, Q1a} = queue:out(Q1), - true = case MsgOnDisk of - true -> true; - false -> maybe_write_msg_to_disk(true, Msg) - end, - maybe_push_q1_out( - store_beta_entry({index, MsgId, SeqId, IsDelivered, true, IndexOnDisk}, - State #vqstate { ram_msg_count = RamMsgCount - 1, - q1 = Q1a })). +maybe_push_q1_to_betas(State = #vqstate { ram_msg_count = RamMsgCount, + q1 = Q1 }) -> + case queue:out(Q1) of + {empty, _Q1} -> State; + {{value, {msg_and_index, Msg = #basic_message { + guid = MsgId, is_persistent = IsPersistent }, + SeqId, IsDelivered, MsgOnDisk, IndexOnDisk}}, Q1a} -> + true = case MsgOnDisk of + true -> true; + false -> maybe_write_msg_to_disk(true, Msg) + end, + maybe_push_q1_to_betas( + store_beta_entry({index, MsgId, SeqId, IsPersistent, IsDelivered, + true, IndexOnDisk}, + State #vqstate { ram_msg_count = RamMsgCount - 1, + q1 = Q1a })) + end. + +maybe_push_q4_to_betas(State = + #vqstate { ram_msg_count = RamMsgCount, + target_ram_msg_count = TargetRamMsgCount + }) when TargetRamMsgCount >= RamMsgCount -> + State; +maybe_push_q4_to_betas(State = #vqstate { ram_msg_count = RamMsgCount, + q4 = Q4, q3 = Q3 }) -> + case queue:out_r(Q4) of + {empty, _Q4} -> State; + {{value, {msg_and_index, Msg = #basic_message { + guid = MsgId, is_persistent = IsPersistent }, + SeqId, IsDelivered, MsgOnDisk, IndexOnDisk}}, Q4a} -> + true = case MsgOnDisk of + true -> true; + false -> maybe_write_msg_to_disk(true, Msg) + end, + Q3a = queue:in_r({index, MsgId, SeqId, IsPersistent, IsDelivered, + true, IndexOnDisk}, Q3), + maybe_push_q4_to_betas( + State #vqstate { ram_msg_count = RamMsgCount - 1, + q3 = Q3a, q4 = Q4a }) + end. + +push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3, + index_state = IndexState }) -> + {Len1, Q2a, IndexState1} = + push_betas_to_gammas(fun queue:out/1, undefined, Q2, IndexState), + State1 = State #vqstate { q2 = Q2a, gamma = Gamma + Len1, + index_state = IndexState1 }, + case queue:out(Q3) of + {empty, _Q3} -> State1; + {{value, {index, _MsgId, SeqId, _IsPersistent, _IsDelivered, + true, _IndexOnDisk}}, _Q3a} -> + Limit = rabbit_queue_index:next_segment_boundary(SeqId) - 1, + {Len2, Q3b, IndexState2} = + push_betas_to_gammas(fun queue:out_r/1, Limit, Q3, IndexState1), + State1 #vqstate { q3 = Q3b, gamma = Gamma + Len1 + Len2, + index_state = IndexState2 } + end. + +push_betas_to_gammas(Generator, Limit, Q, IndexState) -> + push_betas_to_gammas(Generator, Limit, Q, 0, IndexState). + +push_betas_to_gammas(Generator, Limit, Q, Count, IndexState) -> + case Generator(Q) of + {empty, Qa} -> {Count, Qa, IndexState}; + {{value, {index, _MsgId, Limit, _IsPersistent, _IsDelivered, + _MsgOnDisk, _IndexOnDisk}}, _Qa} -> + {Count, Q, IndexState}; + {{value, {index, MsgId, SeqId, IsPersistent, IsDelivered, + true, IndexOnDisk}}, Qa} -> + IndexState1 = + case IndexOnDisk of + true -> IndexState; + false -> + {true, IndexState2} = + maybe_write_index_to_disk( + true, IsPersistent, MsgId, + SeqId, IsDelivered, IndexState), + IndexState2 + end, + push_betas_to_gammas(Generator, Limit, Qa, Count + 1, IndexState1) + end. +