Merge pull request #11964 from rabbitmq/qq-checkpointing-tweaks
QQ: checkpointing frequency improvements
This commit is contained in:
commit
178f9a962e
|
|
@ -714,6 +714,7 @@ rabbitmq_suite(
|
|||
"@gen_batch_server//:erlang_app",
|
||||
"@meck//:erlang_app",
|
||||
"@ra//:erlang_app",
|
||||
"//deps/rabbitmq_ct_helpers:erlang_app",
|
||||
],
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -1332,7 +1332,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
|
|||
outs = ["test/rabbit_fifo_int_SUITE.beam"],
|
||||
app_name = "rabbit",
|
||||
erlc_opts = "//:test_erlc_opts",
|
||||
deps = ["//deps/rabbit_common:erlang_app"],
|
||||
deps = ["//deps/rabbit_common:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
|
||||
)
|
||||
erlang_bytecode(
|
||||
name = "rabbit_fifo_prop_SUITE_beam_files",
|
||||
|
|
|
|||
|
|
@ -192,7 +192,6 @@ init(#{name := Name,
|
|||
update_config(Conf, State) ->
|
||||
DLH = maps:get(dead_letter_handler, Conf, undefined),
|
||||
BLH = maps:get(become_leader_handler, Conf, undefined),
|
||||
RCI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
|
||||
Overflow = maps:get(overflow_strategy, Conf, drop_head),
|
||||
MaxLength = maps:get(max_length, Conf, undefined),
|
||||
MaxBytes = maps:get(max_bytes, Conf, undefined),
|
||||
|
|
@ -206,11 +205,9 @@ update_config(Conf, State) ->
|
|||
competing
|
||||
end,
|
||||
Cfg = State#?STATE.cfg,
|
||||
RCISpec = {RCI, RCI},
|
||||
|
||||
LastActive = maps:get(created, Conf, undefined),
|
||||
State#?STATE{cfg = Cfg#cfg{release_cursor_interval = RCISpec,
|
||||
dead_letter_handler = DLH,
|
||||
State#?STATE{cfg = Cfg#cfg{dead_letter_handler = DLH,
|
||||
become_leader_handler = BLH,
|
||||
overflow_strategy = Overflow,
|
||||
max_length = MaxLength,
|
||||
|
|
@ -485,7 +482,7 @@ apply(#{index := Index}, #purge{},
|
|||
returns = lqueue:new(),
|
||||
msg_bytes_enqueue = 0
|
||||
},
|
||||
Effects0 = [garbage_collection],
|
||||
Effects0 = [{aux, force_checkpoint}, garbage_collection],
|
||||
Reply = {purge, NumReady},
|
||||
{State, _, Effects} = evaluate_limit(Index, false, State0,
|
||||
State1, Effects0),
|
||||
|
|
@ -580,9 +577,8 @@ apply(#{system_time := Ts} = Meta,
|
|||
Effects = [{monitor, node, Node} | Effects1],
|
||||
checkout(Meta, State0, State#?STATE{enqueuers = Enqs,
|
||||
last_active = Ts}, Effects);
|
||||
apply(#{index := _Idx} = Meta, {down, Pid, _Info}, State0) ->
|
||||
{State1, Effects1} = activate_next_consumer(
|
||||
handle_down(Meta, Pid, State0)),
|
||||
apply(Meta, {down, Pid, _Info}, State0) ->
|
||||
{State1, Effects1} = activate_next_consumer(handle_down(Meta, Pid, State0)),
|
||||
checkout(Meta, State0, State1, Effects1);
|
||||
apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0,
|
||||
enqueuers = Enqs0,
|
||||
|
|
@ -670,7 +666,8 @@ convert_v3_to_v4(#{} = _Meta, StateV3) ->
|
|||
end, Returns0)),
|
||||
|
||||
Messages = rabbit_fifo_q:from_lqueue(Messages0),
|
||||
#?STATE{cfg = rabbit_fifo_v3:get_field(cfg, StateV3),
|
||||
Cfg = rabbit_fifo_v3:get_field(cfg, StateV3),
|
||||
#?STATE{cfg = Cfg#cfg{unused_1 = ?NIL},
|
||||
messages = Messages,
|
||||
messages_total = rabbit_fifo_v3:get_field(messages_total, StateV3),
|
||||
returns = Returns,
|
||||
|
|
@ -813,8 +810,7 @@ state_enter0(_, _, Effects) ->
|
|||
Effects.
|
||||
|
||||
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
|
||||
tick(Ts, #?STATE{cfg = #cfg{name = _Name,
|
||||
resource = QName}} = State) ->
|
||||
tick(Ts, #?STATE{cfg = #cfg{resource = QName}} = State) ->
|
||||
case is_expired(Ts, State) of
|
||||
true ->
|
||||
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}];
|
||||
|
|
@ -835,7 +831,6 @@ overview(#?STATE{consumers = Cons,
|
|||
waiting_consumers = WaitingConsumers} = State) ->
|
||||
Conf = #{name => Cfg#cfg.name,
|
||||
resource => Cfg#cfg.resource,
|
||||
release_cursor_interval => Cfg#cfg.release_cursor_interval,
|
||||
dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler,
|
||||
max_length => Cfg#cfg.max_length,
|
||||
max_bytes => Cfg#cfg.max_bytes,
|
||||
|
|
@ -908,9 +903,10 @@ which_module(4) -> ?MODULE.
|
|||
|
||||
-record(checkpoint, {index :: ra:index(),
|
||||
timestamp :: milliseconds(),
|
||||
enqueue_count :: non_neg_integer(),
|
||||
smallest_index :: undefined | ra:index(),
|
||||
messages_total :: non_neg_integer()}).
|
||||
messages_total :: non_neg_integer(),
|
||||
indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(),
|
||||
unused_1 = ?NIL}).
|
||||
-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
|
||||
-record(aux, {name :: atom(),
|
||||
capacity :: term(),
|
||||
|
|
@ -921,7 +917,6 @@ which_module(4) -> ?MODULE.
|
|||
gc = #aux_gc{} :: #aux_gc{},
|
||||
tick_pid :: undefined | pid(),
|
||||
cache = #{} :: map(),
|
||||
%% TODO: we need a state conversion for this
|
||||
last_checkpoint :: #checkpoint{}}).
|
||||
|
||||
init_aux(Name) when is_atom(Name) ->
|
||||
|
|
@ -934,8 +929,8 @@ init_aux(Name) when is_atom(Name) ->
|
|||
capacity = {inactive, Now, 1, 1.0},
|
||||
last_checkpoint = #checkpoint{index = 0,
|
||||
timestamp = erlang:system_time(millisecond),
|
||||
enqueue_count = 0,
|
||||
messages_total = 0}}.
|
||||
messages_total = 0,
|
||||
unused_1 = ?NIL}}.
|
||||
|
||||
handle_aux(RaftState, Tag, Cmd, #aux{name = Name,
|
||||
capacity = Cap,
|
||||
|
|
@ -950,6 +945,35 @@ handle_aux(RaftState, Tag, Cmd, AuxV2, RaAux)
|
|||
Name = element(2, AuxV2),
|
||||
AuxV3 = init_aux(Name),
|
||||
handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux);
|
||||
handle_aux(leader, cast, eval,
|
||||
#?AUX{last_decorators_state = LastDec,
|
||||
last_checkpoint = Check0} = Aux0,
|
||||
RaAux) ->
|
||||
#?STATE{cfg = #cfg{resource = QName}} = MacState =
|
||||
ra_aux:machine_state(RaAux),
|
||||
|
||||
Ts = erlang:system_time(millisecond),
|
||||
{Check, Effects0} = do_checkpoints(Ts, Check0, RaAux, false),
|
||||
|
||||
%% this is called after each batch of commands have been applied
|
||||
%% set timer for message expire
|
||||
%% should really be the last applied index ts but this will have to do
|
||||
Effects1 = timer_effect(Ts, MacState, Effects0),
|
||||
case query_notify_decorators_info(MacState) of
|
||||
LastDec ->
|
||||
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects1};
|
||||
{MaxActivePriority, IsEmpty} = NewLast ->
|
||||
Effects = [notify_decorators_effect(QName, MaxActivePriority, IsEmpty)
|
||||
| Effects1],
|
||||
{no_reply, Aux0#?AUX{last_checkpoint = Check,
|
||||
last_decorators_state = NewLast}, RaAux, Effects}
|
||||
end;
|
||||
handle_aux(_RaftState, cast, eval,
|
||||
#?AUX{last_checkpoint = Check0} = Aux0,
|
||||
RaAux) ->
|
||||
Ts = erlang:system_time(millisecond),
|
||||
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, false),
|
||||
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects};
|
||||
handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
|
||||
consumer_key = Key} = Ret, Corr, Pid},
|
||||
Aux0, RaAux0) ->
|
||||
|
|
@ -959,18 +983,18 @@ handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
|
|||
case find_consumer(Key, Consumers) of
|
||||
{ConsumerKey, #consumer{checked_out = Checked}} ->
|
||||
{RaAux, ToReturn} =
|
||||
maps:fold(
|
||||
fun (MsgId, ?MSG(Idx, Header), {RA0, Acc}) ->
|
||||
%% it is possible this is not found if the consumer
|
||||
%% crashed and the message got removed
|
||||
case ra_aux:log_fetch(Idx, RA0) of
|
||||
{{_Term, _Meta, Cmd}, RA} ->
|
||||
Msg = get_msg(Cmd),
|
||||
{RA, [{MsgId, Idx, Header, Msg} | Acc]};
|
||||
{undefined, RA} ->
|
||||
{RA, Acc}
|
||||
end
|
||||
end, {RaAux0, []}, maps:with(MsgIds, Checked)),
|
||||
maps:fold(
|
||||
fun (MsgId, ?MSG(Idx, Header), {RA0, Acc}) ->
|
||||
%% it is possible this is not found if the consumer
|
||||
%% crashed and the message got removed
|
||||
case ra_aux:log_fetch(Idx, RA0) of
|
||||
{{_Term, _Meta, Cmd}, RA} ->
|
||||
Msg = get_msg(Cmd),
|
||||
{RA, [{MsgId, Idx, Header, Msg} | Acc]};
|
||||
{undefined, RA} ->
|
||||
{RA, Acc}
|
||||
end
|
||||
end, {RaAux0, []}, maps:with(MsgIds, Checked)),
|
||||
|
||||
Appends = make_requeue(ConsumerKey, {notify, Corr, Pid},
|
||||
lists:sort(ToReturn), []),
|
||||
|
|
@ -1020,35 +1044,6 @@ handle_aux(_, _, {get_checked_out, ConsumerKey, MsgIds}, Aux0, RaAux0) ->
|
|||
_ ->
|
||||
{reply, {error, consumer_not_found}, Aux0, RaAux0}
|
||||
end;
|
||||
handle_aux(leader, cast, eval,
|
||||
#?AUX{last_decorators_state = LastDec,
|
||||
last_checkpoint = Check0} = Aux0,
|
||||
RaAux) ->
|
||||
#?STATE{cfg = #cfg{resource = QName}} = MacState =
|
||||
ra_aux:machine_state(RaAux),
|
||||
|
||||
Ts = erlang:system_time(millisecond),
|
||||
{Check, Effects0} = do_checkpoints(Ts, Check0, RaAux),
|
||||
|
||||
%% this is called after each batch of commands have been applied
|
||||
%% set timer for message expire
|
||||
%% should really be the last applied index ts but this will have to do
|
||||
Effects1 = timer_effect(Ts, MacState, Effects0),
|
||||
case query_notify_decorators_info(MacState) of
|
||||
LastDec ->
|
||||
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects1};
|
||||
{MaxActivePriority, IsEmpty} = NewLast ->
|
||||
Effects = [notify_decorators_effect(QName, MaxActivePriority, IsEmpty)
|
||||
| Effects1],
|
||||
{no_reply, Aux0#?AUX{last_checkpoint = Check,
|
||||
last_decorators_state = NewLast}, RaAux, Effects}
|
||||
end;
|
||||
handle_aux(_RaftState, cast, eval,
|
||||
#?AUX{last_checkpoint = Check0} = Aux0,
|
||||
RaAux) ->
|
||||
Ts = erlang:system_time(millisecond),
|
||||
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux),
|
||||
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects};
|
||||
handle_aux(_RaState, cast, Cmd, #?AUX{capacity = Use0} = Aux0, RaAux)
|
||||
when Cmd == active orelse Cmd == inactive ->
|
||||
{no_reply, Aux0#?AUX{capacity = update_use(Use0, Cmd)}, RaAux};
|
||||
|
|
@ -1107,6 +1102,11 @@ handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0,
|
|||
end;
|
||||
handle_aux(_, _, garbage_collection, Aux, RaAux) ->
|
||||
{no_reply, force_eval_gc(RaAux, Aux), RaAux};
|
||||
handle_aux(_RaState, _, force_checkpoint,
|
||||
#?AUX{last_checkpoint = Check0} = Aux, RaAux) ->
|
||||
Ts = erlang:system_time(millisecond),
|
||||
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, true),
|
||||
{no_reply, Aux#?AUX{last_checkpoint= Check}, RaAux, Effects};
|
||||
handle_aux(RaState, _, {dlx, _} = Cmd, Aux0, RaAux) ->
|
||||
#?STATE{dlx = DlxState,
|
||||
cfg = #cfg{dead_letter_handler = DLH,
|
||||
|
|
@ -2639,8 +2639,8 @@ suspected_pids_for(Node, #?STATE{consumers = Cons0,
|
|||
end, Enqs, WaitingConsumers0).
|
||||
|
||||
is_expired(Ts, #?STATE{cfg = #cfg{expires = Expires},
|
||||
last_active = LastActive,
|
||||
consumers = Consumers})
|
||||
last_active = LastActive,
|
||||
consumers = Consumers})
|
||||
when is_number(LastActive) andalso is_number(Expires) ->
|
||||
%% TODO: should it be active consumers?
|
||||
Active = maps:filter(fun (_, #consumer{status = suspected_down}) ->
|
||||
|
|
@ -2845,53 +2845,53 @@ priority_tag(Msg) ->
|
|||
lo
|
||||
end.
|
||||
|
||||
-define(CHECK_ENQ_MIN_INTERVAL_MS, 500).
|
||||
-define(CHECK_ENQ_MIN_INDEXES, 4096).
|
||||
-define(CHECK_MIN_INTERVAL_MS, 5000).
|
||||
-define(CHECK_MIN_INDEXES, 65456).
|
||||
|
||||
do_checkpoints(Ts,
|
||||
#checkpoint{index = ChIdx,
|
||||
timestamp = ChTime,
|
||||
enqueue_count = ChEnqCnt,
|
||||
smallest_index = LastSmallest,
|
||||
messages_total = LastMsgsTot} = Check0, RaAux) ->
|
||||
indexes = MinIndexes} = Check0, RaAux, Force) ->
|
||||
LastAppliedIdx = ra_aux:last_applied(RaAux),
|
||||
#?STATE{enqueue_count = EnqCnt} = MacState = ra_aux:machine_state(RaAux),
|
||||
IndexesSince = LastAppliedIdx - ChIdx,
|
||||
#?STATE{} = MacState = ra_aux:machine_state(RaAux),
|
||||
TimeSince = Ts - ChTime,
|
||||
NewSmallest = case smallest_raft_index(MacState) of
|
||||
undefined ->
|
||||
LastAppliedIdx;
|
||||
Smallest ->
|
||||
Smallest
|
||||
end,
|
||||
MsgsTot = messages_total(MacState),
|
||||
Mult = case MsgsTot > 200_000 of
|
||||
true ->
|
||||
min(4, MsgsTot div 100_000);
|
||||
false ->
|
||||
1
|
||||
end,
|
||||
Since = Ts - ChTime,
|
||||
NewSmallest = case smallest_raft_index(MacState) of
|
||||
undefined ->
|
||||
LastAppliedIdx;
|
||||
Smallest ->
|
||||
Smallest
|
||||
end,
|
||||
{Check, Effects} = case (EnqCnt - ChEnqCnt > ?CHECK_ENQ_MIN_INDEXES andalso
|
||||
Since > (?CHECK_ENQ_MIN_INTERVAL_MS * Mult)) orelse
|
||||
(LastAppliedIdx - ChIdx > ?CHECK_MIN_INDEXES andalso
|
||||
Since > (?CHECK_MIN_INTERVAL_MS * Mult)) orelse
|
||||
(LastMsgsTot > 0 andalso MsgsTot == 0) of
|
||||
true ->
|
||||
%% take a checkpoint;
|
||||
{#checkpoint{index = LastAppliedIdx,
|
||||
timestamp = Ts,
|
||||
enqueue_count = EnqCnt,
|
||||
smallest_index = NewSmallest,
|
||||
messages_total = MsgsTot},
|
||||
[{checkpoint, LastAppliedIdx, MacState} |
|
||||
release_cursor(LastSmallest, NewSmallest)]};
|
||||
false ->
|
||||
{Check0#checkpoint{smallest_index = NewSmallest},
|
||||
release_cursor(LastSmallest, NewSmallest)}
|
||||
end,
|
||||
{CheckMinInterval, CheckMinIndexes, CheckMaxIndexes} =
|
||||
persistent_term:get(quorum_queue_checkpoint_config,
|
||||
{?CHECK_MIN_INTERVAL_MS, ?CHECK_MIN_INDEXES,
|
||||
?CHECK_MAX_INDEXES}),
|
||||
EnoughTimeHasPassed = TimeSince > CheckMinInterval,
|
||||
|
||||
{Check, Effects}.
|
||||
%% enough time has passed and enough indexes have been committed
|
||||
case (IndexesSince > MinIndexes andalso
|
||||
EnoughTimeHasPassed) orelse
|
||||
%% the queue is empty and some commands have been
|
||||
%% applied since the last checkpoint
|
||||
(MsgsTot == 0 andalso
|
||||
IndexesSince > CheckMinIndexes andalso
|
||||
EnoughTimeHasPassed) orelse
|
||||
Force of
|
||||
true ->
|
||||
%% take fewer checkpoints the more messages there are on queue
|
||||
NextIndexes = min(max(MsgsTot, CheckMinIndexes), CheckMaxIndexes),
|
||||
%% take a checkpoint;
|
||||
{#checkpoint{index = LastAppliedIdx,
|
||||
timestamp = Ts,
|
||||
smallest_index = NewSmallest,
|
||||
messages_total = MsgsTot,
|
||||
indexes = NextIndexes},
|
||||
[{checkpoint, LastAppliedIdx, MacState} |
|
||||
release_cursor(LastSmallest, NewSmallest)]};
|
||||
false ->
|
||||
{Check0#checkpoint{smallest_index = NewSmallest},
|
||||
release_cursor(LastSmallest, NewSmallest)}
|
||||
end.
|
||||
|
||||
release_cursor(LastSmallest, Smallest)
|
||||
when is_integer(LastSmallest) andalso
|
||||
|
|
|
|||
|
|
@ -12,6 +12,8 @@
|
|||
%% Raw message data is always stored on disk.
|
||||
-define(MSG(Index, Header), ?TUPLE(Index, Header)).
|
||||
|
||||
-define(NIL, []).
|
||||
|
||||
-define(IS_HEADER(H),
|
||||
(is_integer(H) andalso H >= 0) orelse
|
||||
is_list(H) orelse
|
||||
|
|
@ -97,8 +99,10 @@
|
|||
-type applied_mfa() :: {module(), atom(), list()}.
|
||||
% represents a partially applied module call
|
||||
|
||||
-define(RELEASE_CURSOR_EVERY, 2048 * 4).
|
||||
-define(RELEASE_CURSOR_EVERY_MAX, 1_000_000).
|
||||
-define(CHECK_MIN_INTERVAL_MS, 1000).
|
||||
-define(CHECK_MIN_INDEXES, 4096).
|
||||
-define(CHECK_MAX_INDEXES, 666_667).
|
||||
|
||||
-define(USE_AVG_HALF_LIFE, 10000.0).
|
||||
%% an average QQ without any message uses about 100KB so setting this limit
|
||||
%% to ~10 times that should be relatively safe.
|
||||
|
|
@ -143,20 +147,20 @@
|
|||
-record(enqueuer,
|
||||
{next_seqno = 1 :: msg_seqno(),
|
||||
% out of order enqueues - sorted list
|
||||
unused,
|
||||
unused = ?NIL,
|
||||
status = up :: up | suspected_down,
|
||||
%% it is useful to have a record of when this was blocked
|
||||
%% so that we can retry sending the block effect if
|
||||
%% the publisher did not receive the initial one
|
||||
blocked :: option(ra:index()),
|
||||
unused_1,
|
||||
unused_2
|
||||
unused_1 = ?NIL,
|
||||
unused_2 = ?NIL
|
||||
}).
|
||||
|
||||
-record(cfg,
|
||||
{name :: atom(),
|
||||
resource :: rabbit_types:r('queue'),
|
||||
release_cursor_interval :: option({non_neg_integer(), non_neg_integer()}),
|
||||
unused_1 = ?NIL,
|
||||
dead_letter_handler :: dead_letter_handler(),
|
||||
become_leader_handler :: option(applied_mfa()),
|
||||
overflow_strategy = drop_head :: drop_head | reject_publish,
|
||||
|
|
@ -168,8 +172,8 @@
|
|||
delivery_limit :: option(non_neg_integer()),
|
||||
expires :: option(milliseconds()),
|
||||
msg_ttl :: option(milliseconds()),
|
||||
unused_1,
|
||||
unused_2
|
||||
unused_2 = ?NIL,
|
||||
unused_3 = ?NIL
|
||||
}).
|
||||
|
||||
-record(rabbit_fifo,
|
||||
|
|
@ -191,7 +195,7 @@
|
|||
% index when there are large gaps but should be faster than gb_trees
|
||||
% for normal appending operations as it's backed by a map
|
||||
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
|
||||
unused_1,
|
||||
unused_1 = ?NIL,
|
||||
% consumers need to reflect consumer state at time of snapshot
|
||||
consumers = #{} :: #{consumer_key() => consumer()},
|
||||
% consumers that require further service are queued here
|
||||
|
|
@ -205,18 +209,17 @@
|
|||
waiting_consumers = [] :: [{consumer_key(), consumer()}],
|
||||
last_active :: option(non_neg_integer()),
|
||||
msg_cache :: option({ra:index(), raw_msg()}),
|
||||
unused_2
|
||||
unused_2 = ?NIL
|
||||
}).
|
||||
|
||||
-type config() :: #{name := atom(),
|
||||
queue_resource := rabbit_types:r('queue'),
|
||||
dead_letter_handler => dead_letter_handler(),
|
||||
become_leader_handler => applied_mfa(),
|
||||
release_cursor_interval => non_neg_integer(),
|
||||
checkpoint_min_indexes => non_neg_integer(),
|
||||
checkpoint_max_indexes => non_neg_integer(),
|
||||
max_length => non_neg_integer(),
|
||||
max_bytes => non_neg_integer(),
|
||||
max_in_memory_length => non_neg_integer(),
|
||||
max_in_memory_bytes => non_neg_integer(),
|
||||
overflow_strategy => drop_head | reject_publish,
|
||||
single_active_consumer_on => boolean(),
|
||||
delivery_limit => non_neg_integer(),
|
||||
|
|
|
|||
|
|
@ -320,8 +320,6 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
|
|||
OverflowBin = args_policy_lookup(<<"overflow">>, fun policyHasPrecedence/2, Q),
|
||||
Overflow = overflow(OverflowBin, drop_head, QName),
|
||||
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
|
||||
MaxMemoryLength = args_policy_lookup(<<"max-in-memory-length">>, fun min/2, Q),
|
||||
MaxMemoryBytes = args_policy_lookup(<<"max-in-memory-bytes">>, fun min/2, Q),
|
||||
DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q),
|
||||
Expires = args_policy_lookup(<<"expires">>, fun min/2, Q),
|
||||
MsgTTL = args_policy_lookup(<<"message-ttl">>, fun min/2, Q),
|
||||
|
|
@ -331,8 +329,6 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
|
|||
become_leader_handler => {?MODULE, become_leader, [QName]},
|
||||
max_length => MaxLength,
|
||||
max_bytes => MaxBytes,
|
||||
max_in_memory_length => MaxMemoryLength,
|
||||
max_in_memory_bytes => MaxMemoryBytes,
|
||||
single_active_consumer_on => single_active_consumer_on(Q),
|
||||
delivery_limit => DeliveryLimit,
|
||||
overflow_strategy => Overflow,
|
||||
|
|
|
|||
|
|
@ -2737,7 +2737,44 @@ modify_test(Config) ->
|
|||
|
||||
ok.
|
||||
|
||||
ttb_test(Config) ->
|
||||
S0 = init(#{name => ?FUNCTION_NAME,
|
||||
queue_resource =>
|
||||
rabbit_misc:r("/", queue, ?FUNCTION_NAME_B)}),
|
||||
|
||||
|
||||
S1 = do_n(5_000_000,
|
||||
fun (N, Acc) ->
|
||||
I = (5_000_000 - N),
|
||||
element(1, enq(Config, I, I, ?FUNCTION_NAME_B, Acc))
|
||||
end, S0),
|
||||
|
||||
|
||||
|
||||
{T1, _Res} = timer:tc(fun () ->
|
||||
do_n(100, fun (_, S) ->
|
||||
term_to_binary(S),
|
||||
S1 end, S1)
|
||||
end),
|
||||
ct:pal("T1 took ~bus", [T1]),
|
||||
|
||||
|
||||
{T2, _} = timer:tc(fun () ->
|
||||
do_n(100, fun (_, S) -> term_to_iovec(S), S1 end, S1)
|
||||
end),
|
||||
ct:pal("T2 took ~bus", [T2]),
|
||||
|
||||
ok.
|
||||
|
||||
%% Utility
|
||||
%%
|
||||
|
||||
do_n(0, _, A) ->
|
||||
A;
|
||||
do_n(N, Fun, A0) ->
|
||||
A = Fun(N, A0),
|
||||
do_n(N-1, Fun, A).
|
||||
|
||||
|
||||
init(Conf) -> rabbit_fifo:init(Conf).
|
||||
make_register_enqueuer(Pid) -> rabbit_fifo:make_register_enqueuer(Pid).
|
||||
|
|
|
|||
|
|
@ -380,6 +380,16 @@ returns_after_down(Config) ->
|
|||
after 5000 ->
|
||||
ct:fail("waiting for process exit timed out")
|
||||
end,
|
||||
rabbit_ct_helpers:await_condition(
|
||||
fun () ->
|
||||
case ra:member_overview(ServerId) of
|
||||
{ok, #{machine := #{num_consumers := 0}}, _} ->
|
||||
true;
|
||||
X ->
|
||||
ct:pal("X ~p", [X]),
|
||||
false
|
||||
end
|
||||
end),
|
||||
% message should be available for dequeue
|
||||
{ok, _, {_, _, _, _, Msg1Out}, _} =
|
||||
rabbit_fifo_client:dequeue(ClusterName, <<"tag">>, settled, F2),
|
||||
|
|
|
|||
Loading…
Reference in New Issue