From 0f1f27c1dd01fb92cb37da31c8aeeede56bc7bdd Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 9 Aug 2024 09:37:00 +0100 Subject: [PATCH 1/2] Qq: adjust checkpointing algo to something more like it was in 3.13.x. Also add a force_checkpoint aux command that the purge operation emits - this can also be used to try to force a checkpoint --- deps/rabbit/BUILD.bazel | 1 + deps/rabbit/app.bzl | 2 +- deps/rabbit/src/rabbit_fifo.erl | 197 +++++++++++---------- deps/rabbit/src/rabbit_fifo.hrl | 27 +-- deps/rabbit/src/rabbit_quorum_queue.erl | 4 - deps/rabbit/test/rabbit_fifo_SUITE.erl | 37 ++++ deps/rabbit/test/rabbit_fifo_int_SUITE.erl | 10 ++ 7 files changed, 164 insertions(+), 114 deletions(-) diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 6d42d7b9f5..f3d4233f36 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -714,6 +714,7 @@ rabbitmq_suite( "@gen_batch_server//:erlang_app", "@meck//:erlang_app", "@ra//:erlang_app", + "//deps/rabbitmq_ct_helpers:erlang_app", ], ) diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 659ef70eb8..7586ab97ad 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -1329,7 +1329,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"): outs = ["test/rabbit_fifo_int_SUITE.beam"], app_name = "rabbit", erlc_opts = "//:test_erlc_opts", - deps = ["//deps/rabbit_common:erlang_app"], + deps = ["//deps/rabbit_common:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"], ) erlang_bytecode( name = "rabbit_fifo_prop_SUITE_beam_files", diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 7d357beadc..0763c8beb7 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -192,7 +192,6 @@ init(#{name := Name, update_config(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), BLH = maps:get(become_leader_handler, Conf, undefined), - RCI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY), Overflow = maps:get(overflow_strategy, Conf, drop_head), MaxLength = maps:get(max_length, Conf, undefined), MaxBytes = maps:get(max_bytes, Conf, undefined), @@ -206,11 +205,9 @@ update_config(Conf, State) -> competing end, Cfg = State#?STATE.cfg, - RCISpec = {RCI, RCI}, LastActive = maps:get(created, Conf, undefined), - State#?STATE{cfg = Cfg#cfg{release_cursor_interval = RCISpec, - dead_letter_handler = DLH, + State#?STATE{cfg = Cfg#cfg{dead_letter_handler = DLH, become_leader_handler = BLH, overflow_strategy = Overflow, max_length = MaxLength, @@ -485,7 +482,7 @@ apply(#{index := Index}, #purge{}, returns = lqueue:new(), msg_bytes_enqueue = 0 }, - Effects0 = [garbage_collection], + Effects0 = [{aux, force_checkpoint}, garbage_collection], Reply = {purge, NumReady}, {State, _, Effects} = evaluate_limit(Index, false, State0, State1, Effects0), @@ -580,9 +577,8 @@ apply(#{system_time := Ts} = Meta, Effects = [{monitor, node, Node} | Effects1], checkout(Meta, State0, State#?STATE{enqueuers = Enqs, last_active = Ts}, Effects); -apply(#{index := _Idx} = Meta, {down, Pid, _Info}, State0) -> - {State1, Effects1} = activate_next_consumer( - handle_down(Meta, Pid, State0)), +apply(Meta, {down, Pid, _Info}, State0) -> + {State1, Effects1} = activate_next_consumer(handle_down(Meta, Pid, State0)), checkout(Meta, State0, State1, Effects1); apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0, enqueuers = Enqs0, @@ -670,7 +666,8 @@ convert_v3_to_v4(#{} = _Meta, StateV3) -> end, Returns0)), Messages = rabbit_fifo_q:from_lqueue(Messages0), - #?STATE{cfg = rabbit_fifo_v3:get_field(cfg, StateV3), + Cfg = rabbit_fifo_v3:get_field(cfg, StateV3), + #?STATE{cfg = Cfg#cfg{unused_1 = ?NIL}, messages = Messages, messages_total = rabbit_fifo_v3:get_field(messages_total, StateV3), returns = Returns, @@ -813,8 +810,7 @@ state_enter0(_, _, Effects) -> Effects. -spec tick(non_neg_integer(), state()) -> ra_machine:effects(). -tick(Ts, #?STATE{cfg = #cfg{name = _Name, - resource = QName}} = State) -> +tick(Ts, #?STATE{cfg = #cfg{resource = QName}} = State) -> case is_expired(Ts, State) of true -> [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]; @@ -835,7 +831,6 @@ overview(#?STATE{consumers = Cons, waiting_consumers = WaitingConsumers} = State) -> Conf = #{name => Cfg#cfg.name, resource => Cfg#cfg.resource, - release_cursor_interval => Cfg#cfg.release_cursor_interval, dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler, max_length => Cfg#cfg.max_length, max_bytes => Cfg#cfg.max_bytes, @@ -908,9 +903,10 @@ which_module(4) -> ?MODULE. -record(checkpoint, {index :: ra:index(), timestamp :: milliseconds(), - enqueue_count :: non_neg_integer(), smallest_index :: undefined | ra:index(), - messages_total :: non_neg_integer()}). + messages_total :: non_neg_integer(), + indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(), + unused_1 = ?NIL}). -record(aux_gc, {last_raft_idx = 0 :: ra:index()}). -record(aux, {name :: atom(), capacity :: term(), @@ -934,8 +930,8 @@ init_aux(Name) when is_atom(Name) -> capacity = {inactive, Now, 1, 1.0}, last_checkpoint = #checkpoint{index = 0, timestamp = erlang:system_time(millisecond), - enqueue_count = 0, - messages_total = 0}}. + messages_total = 0, + unused_1 = ?NIL}}. handle_aux(RaftState, Tag, Cmd, #aux{name = Name, capacity = Cap, @@ -950,6 +946,35 @@ handle_aux(RaftState, Tag, Cmd, AuxV2, RaAux) Name = element(2, AuxV2), AuxV3 = init_aux(Name), handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux); +handle_aux(leader, cast, eval, + #?AUX{last_decorators_state = LastDec, + last_checkpoint = Check0} = Aux0, + RaAux) -> + #?STATE{cfg = #cfg{resource = QName}} = MacState = + ra_aux:machine_state(RaAux), + + Ts = erlang:system_time(millisecond), + {Check, Effects0} = do_checkpoints(Ts, Check0, RaAux, false), + + %% this is called after each batch of commands have been applied + %% set timer for message expire + %% should really be the last applied index ts but this will have to do + Effects1 = timer_effect(Ts, MacState, Effects0), + case query_notify_decorators_info(MacState) of + LastDec -> + {no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects1}; + {MaxActivePriority, IsEmpty} = NewLast -> + Effects = [notify_decorators_effect(QName, MaxActivePriority, IsEmpty) + | Effects1], + {no_reply, Aux0#?AUX{last_checkpoint = Check, + last_decorators_state = NewLast}, RaAux, Effects} + end; +handle_aux(_RaftState, cast, eval, + #?AUX{last_checkpoint = Check0} = Aux0, + RaAux) -> + Ts = erlang:system_time(millisecond), + {Check, Effects} = do_checkpoints(Ts, Check0, RaAux, false), + {no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects}; handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds, consumer_key = Key} = Ret, Corr, Pid}, Aux0, RaAux0) -> @@ -959,18 +984,18 @@ handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds, case find_consumer(Key, Consumers) of {ConsumerKey, #consumer{checked_out = Checked}} -> {RaAux, ToReturn} = - maps:fold( - fun (MsgId, ?MSG(Idx, Header), {RA0, Acc}) -> - %% it is possible this is not found if the consumer - %% crashed and the message got removed - case ra_aux:log_fetch(Idx, RA0) of - {{_Term, _Meta, Cmd}, RA} -> - Msg = get_msg(Cmd), - {RA, [{MsgId, Idx, Header, Msg} | Acc]}; - {undefined, RA} -> - {RA, Acc} - end - end, {RaAux0, []}, maps:with(MsgIds, Checked)), + maps:fold( + fun (MsgId, ?MSG(Idx, Header), {RA0, Acc}) -> + %% it is possible this is not found if the consumer + %% crashed and the message got removed + case ra_aux:log_fetch(Idx, RA0) of + {{_Term, _Meta, Cmd}, RA} -> + Msg = get_msg(Cmd), + {RA, [{MsgId, Idx, Header, Msg} | Acc]}; + {undefined, RA} -> + {RA, Acc} + end + end, {RaAux0, []}, maps:with(MsgIds, Checked)), Appends = make_requeue(ConsumerKey, {notify, Corr, Pid}, lists:sort(ToReturn), []), @@ -1020,35 +1045,6 @@ handle_aux(_, _, {get_checked_out, ConsumerKey, MsgIds}, Aux0, RaAux0) -> _ -> {reply, {error, consumer_not_found}, Aux0, RaAux0} end; -handle_aux(leader, cast, eval, - #?AUX{last_decorators_state = LastDec, - last_checkpoint = Check0} = Aux0, - RaAux) -> - #?STATE{cfg = #cfg{resource = QName}} = MacState = - ra_aux:machine_state(RaAux), - - Ts = erlang:system_time(millisecond), - {Check, Effects0} = do_checkpoints(Ts, Check0, RaAux), - - %% this is called after each batch of commands have been applied - %% set timer for message expire - %% should really be the last applied index ts but this will have to do - Effects1 = timer_effect(Ts, MacState, Effects0), - case query_notify_decorators_info(MacState) of - LastDec -> - {no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects1}; - {MaxActivePriority, IsEmpty} = NewLast -> - Effects = [notify_decorators_effect(QName, MaxActivePriority, IsEmpty) - | Effects1], - {no_reply, Aux0#?AUX{last_checkpoint = Check, - last_decorators_state = NewLast}, RaAux, Effects} - end; -handle_aux(_RaftState, cast, eval, - #?AUX{last_checkpoint = Check0} = Aux0, - RaAux) -> - Ts = erlang:system_time(millisecond), - {Check, Effects} = do_checkpoints(Ts, Check0, RaAux), - {no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects}; handle_aux(_RaState, cast, Cmd, #?AUX{capacity = Use0} = Aux0, RaAux) when Cmd == active orelse Cmd == inactive -> {no_reply, Aux0#?AUX{capacity = update_use(Use0, Cmd)}, RaAux}; @@ -1107,6 +1103,11 @@ handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0, end; handle_aux(_, _, garbage_collection, Aux, RaAux) -> {no_reply, force_eval_gc(RaAux, Aux), RaAux}; +handle_aux(_RaState, _, force_checkpoint, + #?AUX{last_checkpoint = Check0} = Aux, RaAux) -> + Ts = erlang:system_time(millisecond), + {Check, Effects} = do_checkpoints(Ts, Check0, RaAux, true), + {no_reply, Aux#?AUX{last_checkpoint= Check}, RaAux, Effects}; handle_aux(RaState, _, {dlx, _} = Cmd, Aux0, RaAux) -> #?STATE{dlx = DlxState, cfg = #cfg{dead_letter_handler = DLH, @@ -2639,8 +2640,8 @@ suspected_pids_for(Node, #?STATE{consumers = Cons0, end, Enqs, WaitingConsumers0). is_expired(Ts, #?STATE{cfg = #cfg{expires = Expires}, - last_active = LastActive, - consumers = Consumers}) + last_active = LastActive, + consumers = Consumers}) when is_number(LastActive) andalso is_number(Expires) -> %% TODO: should it be active consumers? Active = maps:filter(fun (_, #consumer{status = suspected_down}) -> @@ -2845,53 +2846,53 @@ priority_tag(Msg) -> lo end. --define(CHECK_ENQ_MIN_INTERVAL_MS, 500). --define(CHECK_ENQ_MIN_INDEXES, 4096). --define(CHECK_MIN_INTERVAL_MS, 5000). --define(CHECK_MIN_INDEXES, 65456). do_checkpoints(Ts, #checkpoint{index = ChIdx, timestamp = ChTime, - enqueue_count = ChEnqCnt, smallest_index = LastSmallest, - messages_total = LastMsgsTot} = Check0, RaAux) -> + indexes = MinIndexes} = Check0, RaAux, Force) -> LastAppliedIdx = ra_aux:last_applied(RaAux), - #?STATE{enqueue_count = EnqCnt} = MacState = ra_aux:machine_state(RaAux), + IndexesSince = LastAppliedIdx - ChIdx, + #?STATE{} = MacState = ra_aux:machine_state(RaAux), + TimeSince = Ts - ChTime, + NewSmallest = case smallest_raft_index(MacState) of + undefined -> + LastAppliedIdx; + Smallest -> + Smallest + end, MsgsTot = messages_total(MacState), - Mult = case MsgsTot > 200_000 of - true -> - min(4, MsgsTot div 100_000); - false -> - 1 - end, - Since = Ts - ChTime, - NewSmallest = case smallest_raft_index(MacState) of - undefined -> - LastAppliedIdx; - Smallest -> - Smallest - end, - {Check, Effects} = case (EnqCnt - ChEnqCnt > ?CHECK_ENQ_MIN_INDEXES andalso - Since > (?CHECK_ENQ_MIN_INTERVAL_MS * Mult)) orelse - (LastAppliedIdx - ChIdx > ?CHECK_MIN_INDEXES andalso - Since > (?CHECK_MIN_INTERVAL_MS * Mult)) orelse - (LastMsgsTot > 0 andalso MsgsTot == 0) of - true -> - %% take a checkpoint; - {#checkpoint{index = LastAppliedIdx, - timestamp = Ts, - enqueue_count = EnqCnt, - smallest_index = NewSmallest, - messages_total = MsgsTot}, - [{checkpoint, LastAppliedIdx, MacState} | - release_cursor(LastSmallest, NewSmallest)]}; - false -> - {Check0#checkpoint{smallest_index = NewSmallest}, - release_cursor(LastSmallest, NewSmallest)} - end, + {CheckMinInterval, CheckMinIndexes, CheckMaxIndexes} = + persistent_term:get(quorum_queue_checkpoint_config, + {?CHECK_MIN_INTERVAL_MS, ?CHECK_MIN_INDEXES, + ?CHECK_MAX_INDEXES}), + EnoughTimeHasPassed = TimeSince > CheckMinInterval, - {Check, Effects}. + %% enough time has passed and enough indexes have been committed + case (IndexesSince > MinIndexes andalso + EnoughTimeHasPassed) orelse + %% the queue is empty and some commands have been + %% applied since the last checkpoint + (MsgsTot == 0 andalso + IndexesSince > CheckMinIndexes andalso + EnoughTimeHasPassed) orelse + Force of + true -> + %% take fewer checkpoints the more messages there are on queue + NextIndexes = min(max(MsgsTot, CheckMinIndexes), CheckMaxIndexes), + %% take a checkpoint; + {#checkpoint{index = LastAppliedIdx, + timestamp = Ts, + smallest_index = NewSmallest, + messages_total = MsgsTot, + indexes = NextIndexes}, + [{checkpoint, LastAppliedIdx, MacState} | + release_cursor(LastSmallest, NewSmallest)]}; + false -> + {Check0#checkpoint{smallest_index = NewSmallest}, + release_cursor(LastSmallest, NewSmallest)} + end. release_cursor(LastSmallest, Smallest) when is_integer(LastSmallest) andalso diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl index a436b5df8a..3031a1b1d4 100644 --- a/deps/rabbit/src/rabbit_fifo.hrl +++ b/deps/rabbit/src/rabbit_fifo.hrl @@ -12,6 +12,8 @@ %% Raw message data is always stored on disk. -define(MSG(Index, Header), ?TUPLE(Index, Header)). +-define(NIL, []). + -define(IS_HEADER(H), (is_integer(H) andalso H >= 0) orelse is_list(H) orelse @@ -97,8 +99,10 @@ -type applied_mfa() :: {module(), atom(), list()}. % represents a partially applied module call --define(RELEASE_CURSOR_EVERY, 2048 * 4). --define(RELEASE_CURSOR_EVERY_MAX, 1_000_000). +-define(CHECK_MIN_INTERVAL_MS, 1000). +-define(CHECK_MIN_INDEXES, 4096). +-define(CHECK_MAX_INDEXES, 666_667). + -define(USE_AVG_HALF_LIFE, 10000.0). %% an average QQ without any message uses about 100KB so setting this limit %% to ~10 times that should be relatively safe. @@ -143,20 +147,20 @@ -record(enqueuer, {next_seqno = 1 :: msg_seqno(), % out of order enqueues - sorted list - unused, + unused = ?NIL, status = up :: up | suspected_down, %% it is useful to have a record of when this was blocked %% so that we can retry sending the block effect if %% the publisher did not receive the initial one blocked :: option(ra:index()), - unused_1, - unused_2 + unused_1 = ?NIL, + unused_2 = ?NIL }). -record(cfg, {name :: atom(), resource :: rabbit_types:r('queue'), - release_cursor_interval :: option({non_neg_integer(), non_neg_integer()}), + unused_1 = ?NIL, dead_letter_handler :: dead_letter_handler(), become_leader_handler :: option(applied_mfa()), overflow_strategy = drop_head :: drop_head | reject_publish, @@ -168,8 +172,8 @@ delivery_limit :: option(non_neg_integer()), expires :: option(milliseconds()), msg_ttl :: option(milliseconds()), - unused_1, - unused_2 + unused_2 = ?NIL, + unused_3 = ?NIL }). -record(rabbit_fifo, @@ -191,7 +195,7 @@ % index when there are large gaps but should be faster than gb_trees % for normal appending operations as it's backed by a map ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(), - unused_1, + unused_1 = ?NIL, % consumers need to reflect consumer state at time of snapshot consumers = #{} :: #{consumer_key() => consumer()}, % consumers that require further service are queued here @@ -205,14 +209,15 @@ waiting_consumers = [] :: [{consumer_key(), consumer()}], last_active :: option(non_neg_integer()), msg_cache :: option({ra:index(), raw_msg()}), - unused_2 + unused_2 = ?NIL }). -type config() :: #{name := atom(), queue_resource := rabbit_types:r('queue'), dead_letter_handler => dead_letter_handler(), become_leader_handler => applied_mfa(), - release_cursor_interval => non_neg_integer(), + checkpoint_min_indexes => non_neg_integer(), + checkpoint_max_indexes => non_neg_integer(), max_length => non_neg_integer(), max_bytes => non_neg_integer(), max_in_memory_length => non_neg_integer(), diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index e9a492a668..9084c1369a 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -315,8 +315,6 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> OverflowBin = args_policy_lookup(<<"overflow">>, fun policyHasPrecedence/2, Q), Overflow = overflow(OverflowBin, drop_head, QName), MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), - MaxMemoryLength = args_policy_lookup(<<"max-in-memory-length">>, fun min/2, Q), - MaxMemoryBytes = args_policy_lookup(<<"max-in-memory-bytes">>, fun min/2, Q), DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q), Expires = args_policy_lookup(<<"expires">>, fun min/2, Q), MsgTTL = args_policy_lookup(<<"message-ttl">>, fun min/2, Q), @@ -326,8 +324,6 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> become_leader_handler => {?MODULE, become_leader, [QName]}, max_length => MaxLength, max_bytes => MaxBytes, - max_in_memory_length => MaxMemoryLength, - max_in_memory_bytes => MaxMemoryBytes, single_active_consumer_on => single_active_consumer_on(Q), delivery_limit => DeliveryLimit, overflow_strategy => Overflow, diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index 753704affd..a3608f26ef 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -2737,7 +2737,44 @@ modify_test(Config) -> ok. +ttb_test(Config) -> + S0 = init(#{name => ?FUNCTION_NAME, + queue_resource => + rabbit_misc:r("/", queue, ?FUNCTION_NAME_B)}), + + + S1 = do_n(5_000_000, + fun (N, Acc) -> + I = (5_000_000 - N), + element(1, enq(Config, I, I, ?FUNCTION_NAME_B, Acc)) + end, S0), + + + + {T1, _Res} = timer:tc(fun () -> + do_n(100, fun (_, S) -> + term_to_binary(S), + S1 end, S1) + end), + ct:pal("T1 took ~bus", [T1]), + + + {T2, _} = timer:tc(fun () -> + do_n(100, fun (_, S) -> term_to_iovec(S), S1 end, S1) + end), + ct:pal("T2 took ~bus", [T2]), + + ok. + %% Utility +%% + +do_n(0, _, A) -> + A; +do_n(N, Fun, A0) -> + A = Fun(N, A0), + do_n(N-1, Fun, A). + init(Conf) -> rabbit_fifo:init(Conf). make_register_enqueuer(Pid) -> rabbit_fifo:make_register_enqueuer(Pid). diff --git a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl index 2ae8e4bc55..fae1251d47 100644 --- a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl @@ -380,6 +380,16 @@ returns_after_down(Config) -> after 5000 -> ct:fail("waiting for process exit timed out") end, + rabbit_ct_helpers:await_condition( + fun () -> + case ra:member_overview(ServerId) of + {ok, #{machine := #{num_consumers := 0}}, _} -> + true; + X -> + ct:pal("X ~p", [X]), + false + end + end), % message should be available for dequeue {ok, _, {_, _, _, _, Msg1Out}, _} = rabbit_fifo_client:dequeue(ClusterName, <<"tag">>, settled, F2), From 9ca77f8efe4670433e13620ba96f96ae1f791825 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 15 Aug 2024 15:44:28 -0400 Subject: [PATCH 2/2] Remove max_in_memory_length/bytes from QQ config type Also remove a resolved TODO about conversion for the `last_checkpoint` field. --- deps/rabbit/src/rabbit_fifo.erl | 1 - deps/rabbit/src/rabbit_fifo.hrl | 2 -- 2 files changed, 3 deletions(-) diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 0763c8beb7..04c11c2db5 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -917,7 +917,6 @@ which_module(4) -> ?MODULE. gc = #aux_gc{} :: #aux_gc{}, tick_pid :: undefined | pid(), cache = #{} :: map(), - %% TODO: we need a state conversion for this last_checkpoint :: #checkpoint{}}). init_aux(Name) when is_atom(Name) -> diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl index 3031a1b1d4..f88893374f 100644 --- a/deps/rabbit/src/rabbit_fifo.hrl +++ b/deps/rabbit/src/rabbit_fifo.hrl @@ -220,8 +220,6 @@ checkpoint_max_indexes => non_neg_integer(), max_length => non_neg_integer(), max_bytes => non_neg_integer(), - max_in_memory_length => non_neg_integer(), - max_in_memory_bytes => non_neg_integer(), overflow_strategy => drop_head | reject_publish, single_active_consumer_on => boolean(), delivery_limit => non_neg_integer(),