Chunk quorum queue deliveries (#7175)

This puts a limit to the amount of message data that is added
to the process heap at the same time to around 128KB.

Large prefetch values combined with large messages could cause
excessive garbage collection work.

Also similify the intermediate delivery message format to avoid
allocations that aren't necessary.
This commit is contained in:
Karl Nilsson 2023-02-27 15:30:20 +00:00 committed by GitHub
parent 74ce1a4bcc
commit cb3407564b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 78 additions and 13 deletions

View File

@ -72,7 +72,8 @@
]).
-ifdef(TEST).
-export([update_header/4]).
-export([update_header/4,
chunk_disk_msgs/3]).
-endif.
%% command records representing all the protocol actions that are supported
@ -1876,9 +1877,9 @@ checkout(#{index := Index} = Meta,
end.
checkout0(Meta, {success, ConsumerId, MsgId,
?MSG(RaftIdx, Header), ExpiredMsg, State, Effects},
?MSG(_RaftIdx, _Header) = Msg, ExpiredMsg, State, Effects},
SendAcc0) ->
DelMsg = {RaftIdx, {MsgId, Header}},
DelMsg = {MsgId, Msg},
SendAcc = case maps:get(ConsumerId, SendAcc0, undefined) of
undefined ->
SendAcc0#{ConsumerId => [DelMsg]};
@ -1887,7 +1888,7 @@ checkout0(Meta, {success, ConsumerId, MsgId,
end,
checkout0(Meta, checkout_one(Meta, ExpiredMsg, State, Effects), SendAcc);
checkout0(_Meta, {_Activity, ExpiredMsg, State0, Effects0}, SendAcc) ->
Effects = append_delivery_effects(Effects0, SendAcc, State0),
Effects = add_delivery_effects(Effects0, SendAcc, State0),
{State0, ExpiredMsg, lists:reverse(Effects)}.
evaluate_limit(_Index, Result, _BeforeState,
@ -1942,12 +1943,33 @@ evaluate_limit(Index, Result, BeforeState,
{State0, Result, Effects0}
end.
append_delivery_effects(Effects0, AccMap, _State) when map_size(AccMap) == 0 ->
%% [6,5,4,3,2,1] -> [[1,2],[3,4],[5,6]]
chunk_disk_msgs([], _Bytes, [[] | Chunks]) ->
Chunks;
chunk_disk_msgs([], _Bytes, Chunks) ->
Chunks;
chunk_disk_msgs([{_MsgId, ?MSG(_RaftIdx, Header)} = Msg | Rem],
Bytes, Chunks)
when Bytes >= ?DELIVERY_CHUNK_LIMIT_B ->
Size = get_header(size, Header),
chunk_disk_msgs(Rem, Size, [[Msg] | Chunks]);
chunk_disk_msgs([{_MsgId, ?MSG(_RaftIdx, Header)} = Msg | Rem], Bytes,
[CurChunk | Chunks]) ->
Size = get_header(size, Header),
chunk_disk_msgs(Rem, Bytes + Size, [[Msg | CurChunk] | Chunks]).
add_delivery_effects(Effects0, AccMap, _State)
when map_size(AccMap) == 0 ->
%% does this ever happen?
Effects0;
append_delivery_effects(Effects0, AccMap, State) ->
maps:fold(fun (C, DiskMsgs, Ef) when is_list(DiskMsgs) ->
[delivery_effect(C, lists:reverse(DiskMsgs), State) | Ef]
add_delivery_effects(Effects0, AccMap, State) ->
maps:fold(fun (C, DiskMsgs, Efs)
when is_list(DiskMsgs) ->
lists:foldl(
fun (Msgs, E) ->
[delivery_effect(C, Msgs, State) | E]
end, Efs, chunk_disk_msgs(DiskMsgs, 0, [[]]))
end, Effects0, AccMap).
take_next_msg(#?MODULE{returns = Returns0,
@ -1978,18 +2000,20 @@ get_next_msg(#?MODULE{returns = Returns0,
Msg
end.
delivery_effect({CTag, CPid}, [{Idx, {MsgId, Header}}],
delivery_effect({CTag, CPid}, [{MsgId, ?MSG(Idx, Header)}],
#?MODULE{msg_cache = {Idx, RawMsg}}) ->
{send_msg, CPid, {delivery, CTag, [{MsgId, {Header, RawMsg}}]},
[local, ra_event]};
delivery_effect({CTag, CPid}, Msgs, _State) ->
{RaftIdxs, Data} = lists:unzip(Msgs),
RaftIdxs = lists:foldr(fun ({_, ?MSG(I, _)}, Acc) ->
[I | Acc]
end, [], Msgs),
{log, RaftIdxs,
fun(Log) ->
DelMsgs = lists:zipwith(
fun (Cmd, {MsgId, Header}) ->
fun (Cmd, {MsgId, ?MSG(_Idx, Header)}) ->
{MsgId, {Header, get_msg(Cmd)}}
end, Log, Data),
end, Log, Msgs),
[{send_msg, CPid, {delivery, CTag, DelMsgs}, [local, ra_event]}]
end,
{local, node(CPid)}}.

View File

@ -96,6 +96,7 @@
-define(MB, 1_048_576).
-define(LOW_LIMIT, 0.8).
-define(DELIVERY_CHUNK_LIMIT_B, 128_000).
-record(consumer_cfg,
{meta = #{} :: consumer_meta(),

View File

@ -153,7 +153,9 @@ all_tests() ->
per_message_ttl_expiration_too_high,
consumer_priorities,
cancel_consumer_gh_3729,
cancel_and_consume_with_same_tag
cancel_and_consume_with_same_tag,
validate_messages_on_queue
].
memory_tests() ->
@ -2936,6 +2938,23 @@ cancel_and_consume_with_same_tag(Config) ->
ok.
validate_messages_on_queue(Config) ->
QQ = ?config(queue_name, Config),
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
#'queue.declare_ok'{} = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
Messages = [begin
M = <<I:8000/integer>>,
publish(Ch, QQ, M),
M
end || I <- lists:seq(1, 200)],
amqp_channel:wait_for_confirms_or_die(Ch),
validate_queue(Ch, QQ, Messages),
ok.
leader_locator_client_local(Config) ->

View File

@ -1971,6 +1971,27 @@ header_test(_) ->
?assertEqual(undefined, rabbit_fifo:get_header(blah, H5)),
ok.
chunk_disk_msgs_test(_Config) ->
%% NB: this does test an internal function
%% input to this function is a reversed list of MSGs
Input = [{I, ?MSG(I, 1000)} || I <- lists:seq(200, 1, -1)],
Chunks = rabbit_fifo:chunk_disk_msgs(Input, 0, [[]]),
?assertMatch([_, _], Chunks),
[Chunk1, Chunk2] = Chunks,
?assertMatch([{1, ?MSG(1, 1000)} | _], Chunk1),
%% the chunks are worked out in backwards order, hence the first chunk
%% will be a "remainder" chunk
?assertMatch([{73, ?MSG(73, 1000)} | _], Chunk2),
?assertEqual(128, length(Chunk2)),
?assertEqual(72, length(Chunk1)),
TwoBigMsgs = [{124, ?MSG(124, 200_000)},
{123, ?MSG(123, 200_000)}],
?assertMatch([[{123, ?MSG(123, 200_000)}],
[{124, ?MSG(124, 200_000)}]],
rabbit_fifo:chunk_disk_msgs(TwoBigMsgs, 0, [[]])),
ok.
%% Utility
init(Conf) -> rabbit_fifo:init(Conf).