Introduce new credit_mode {simple_prefetch, MaxCredits} for v3

In rabbit_fifo Ra machine v3 instead of using credit_mode
simple_prefetch, use credit_mode {simple_prefetch, MaxCredits}.

The goal is to rely less on consumer metadata which is supposed to just be a
map of informational metadata.
We know that the prefetch is part of consumer metadata up until now.
However, the prefetch might not be part anymore of consumer metadata in
a future Ra version.

This commit therefore ensures that:
1. in the conversion from v2 to v3, {simple_prefetch, MaxCredits} is
   set as credit_mode if the consumer uses simple_prefetch, and
2. whenever a new credit_mode is set (in merge_consumer() or
   update_consumer()), ensure that the credit_mode is set correctly if
   the machine runs in v3
This commit is contained in:
David Ansari 2022-08-24 14:34:43 +00:00
parent 03659864bb
commit 530b65fa15
4 changed files with 158 additions and 83 deletions

View File

@ -626,7 +626,7 @@ convert_msg({Header, empty}) ->
convert_msg(Header) when ?IS_HEADER(Header) ->
?MSG(undefined, Header).
convert_consumer({ConsumerTag, Pid}, CV1) ->
convert_consumer_v1_to_v2({ConsumerTag, Pid}, CV1) ->
Meta = element(2, CV1),
CheckedOut = element(3, CV1),
NextMsgId = element(4, CV1),
@ -677,11 +677,11 @@ convert_v1_to_v2(V1State0) ->
end, V2PrefReturns, ReturnsV1),
ConsumersV2 = maps:map(
fun (ConsumerId, CV1) ->
convert_consumer(ConsumerId, CV1)
convert_consumer_v1_to_v2(ConsumerId, CV1)
end, ConsumersV1),
WaitingConsumersV2 = lists:map(
fun ({ConsumerId, CV1}) ->
{ConsumerId, convert_consumer(ConsumerId, CV1)}
{ConsumerId, convert_consumer_v1_to_v2(ConsumerId, CV1)}
end, WaitingConsumersV1),
EnqueuersV1 = rabbit_fifo_v1:get_field(enqueuers, V1State),
EnqueuersV2 = maps:map(fun (_EnqPid, Enq) ->
@ -750,6 +750,18 @@ convert_v1_to_v2(V1State0) ->
last_active = rabbit_fifo_v1:get_field(last_active, V1State)
}.
convert_v2_to_v3(#rabbit_fifo{consumers = ConsumersV2} = StateV2) ->
ConsumersV3 = maps:map(fun(_, C) ->
convert_consumer_v2_to_v3(C)
end, ConsumersV2),
StateV2#rabbit_fifo{consumers = ConsumersV3}.
convert_consumer_v2_to_v3(C = #consumer{cfg = Cfg = #consumer_cfg{credit_mode = simple_prefetch,
meta = #{prefetch := Prefetch}}}) ->
C#consumer{cfg = Cfg#consumer_cfg{credit_mode = {simple_prefetch, Prefetch}}};
convert_consumer_v2_to_v3(C) ->
C.
purge_node(Meta, Node, State, Effects) ->
lists:foldl(fun(Pid, {S0, E0}) ->
{S, E} = handle_down(Meta, Pid, S0),
@ -1667,11 +1679,10 @@ increase_credit(_Meta, #consumer{cfg = #consumer_cfg{lifetime = auto,
%% credit_mode: `credited' also doesn't automatically increment credit
Credit;
increase_credit(#{machine_version := MachineVersion},
#consumer{cfg = #consumer_cfg{meta = #{prefetch := Prefetch},
credit_mode = simple_prefetch},
#consumer{cfg = #consumer_cfg{credit_mode = {simple_prefetch, MaxCredit}},
credit = Current}, Credit)
when MachineVersion >= 3, Prefetch > 0 ->
min(Prefetch, Current + Credit);
when MachineVersion >= 3, MaxCredit > 0 ->
min(MaxCredit, Current + Credit);
increase_credit(_Meta, #consumer{credit = Current}, Credit) ->
Current + Credit.
@ -2110,13 +2121,14 @@ uniq_queue_in(_Key, _Consumer, ServiceQueue) ->
ServiceQueue.
update_consumer(Meta, {Tag, Pid} = ConsumerId, ConsumerMeta,
{Life, Credit, Mode} = Spec, Priority,
{Life, Credit, Mode0} = Spec, Priority,
#?MODULE{cfg = #cfg{consumer_strategy = competing},
consumers = Cons0} = State0) ->
Consumer = case Cons0 of
#{ConsumerId := #consumer{} = Consumer0} ->
merge_consumer(Consumer0, ConsumerMeta, Spec, Priority);
merge_consumer(Meta, Consumer0, ConsumerMeta, Spec, Priority);
_ ->
Mode = credit_mode(Meta, Credit, Mode0),
#consumer{cfg = #consumer_cfg{tag = Tag,
pid = Pid,
lifetime = Life,
@ -2127,7 +2139,7 @@ update_consumer(Meta, {Tag, Pid} = ConsumerId, ConsumerMeta,
end,
update_or_remove_sub(Meta, ConsumerId, Consumer, State0);
update_consumer(Meta, {Tag, Pid} = ConsumerId, ConsumerMeta,
{Life, Credit, Mode} = Spec, Priority,
{Life, Credit, Mode0} = Spec, Priority,
#?MODULE{cfg = #cfg{consumer_strategy = single_active},
consumers = Cons0,
waiting_consumers = Waiting,
@ -2137,17 +2149,18 @@ update_consumer(Meta, {Tag, Pid} = ConsumerId, ConsumerMeta,
%% one, then merge
case active_consumer(Cons0) of
{ConsumerId, #consumer{status = up} = Consumer0} ->
Consumer = merge_consumer(Consumer0, ConsumerMeta, Spec, Priority),
Consumer = merge_consumer(Meta, Consumer0, ConsumerMeta, Spec, Priority),
update_or_remove_sub(Meta, ConsumerId, Consumer, State0);
undefined when is_map_key(ConsumerId, Cons0) ->
%% there is no active consumer and the current consumer is in the
%% consumers map and thus must be cancelled, in this case we can just
%% merge and effectively make this the current active one
Consumer0 = maps:get(ConsumerId, Cons0),
Consumer = merge_consumer(Consumer0, ConsumerMeta, Spec, Priority),
Consumer = merge_consumer(Meta, Consumer0, ConsumerMeta, Spec, Priority),
update_or_remove_sub(Meta, ConsumerId, Consumer, State0);
_ ->
%% add as a new waiting consumer
Mode = credit_mode(Meta, Credit, Mode0),
Consumer = #consumer{cfg = #consumer_cfg{tag = Tag,
pid = Pid,
lifetime = Life,
@ -2159,10 +2172,11 @@ update_consumer(Meta, {Tag, Pid} = ConsumerId, ConsumerMeta,
State0#?MODULE{waiting_consumers = Waiting ++ [{ConsumerId, Consumer}]}
end.
merge_consumer(#consumer{cfg = CCfg, checked_out = Checked} = Consumer,
ConsumerMeta, {Life, Credit, Mode}, Priority) ->
merge_consumer(Meta, #consumer{cfg = CCfg, checked_out = Checked} = Consumer,
ConsumerMeta, {Life, Credit, Mode0}, Priority) ->
NumChecked = map_size(Checked),
NewCredit = max(0, Credit - NumChecked),
Mode = credit_mode(Meta, Credit, Mode0),
Consumer#consumer{cfg = CCfg#consumer_cfg{priority = Priority,
meta = ConsumerMeta,
credit_mode = Mode,
@ -2170,6 +2184,12 @@ merge_consumer(#consumer{cfg = CCfg, checked_out = Checked} = Consumer,
status = up,
credit = NewCredit}.
credit_mode(#{machine_version := Vsn}, Credit, simple_prefetch)
when Vsn >= 3 ->
{simple_prefetch, Credit};
credit_mode(_, _, Mode) ->
Mode.
maybe_queue_consumer(ConsumerId, #consumer{credit = Credit} = Con,
ServiceQueue0) ->
case Credit > 0 of
@ -2391,7 +2411,7 @@ convert(0, To, State) ->
convert(1, To, State) ->
convert(2, To, convert_v1_to_v2(State));
convert(2, To, State) ->
convert(3, To, State).
convert(3, To, convert_v2_to_v3(State)).
smallest_raft_index(#?MODULE{messages = Messages,
ra_indexes = Indexes,

View File

@ -66,7 +66,11 @@
-type consumer_id() :: {consumer_tag(), pid()}.
%% The entity that receives messages. Uniquely identifies a consumer.
-type credit_mode() :: simple_prefetch | credited.
-type credit_mode() :: credited |
%% machine_version 2
simple_prefetch |
%% machine_version 3
{simple_prefetch, MaxCredit :: non_neg_integer()}.
%% determines how credit is replenished
-type checkout_spec() :: {once | auto, Num :: non_neg_integer(),
@ -102,7 +106,7 @@
%% or returned.
%% credited: credit can only be changed by receiving a consumer_credit
%% command: `{consumer_credit, ReceiverDeliveryCount, Credit}'
credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data
credit_mode :: credit_mode(), % part of snapshot data
lifetime = once :: once | auto,
priority = 0 :: non_neg_integer()}).

View File

@ -22,7 +22,8 @@
all() ->
[
{group, machine_version_2},
{group, machine_version_3}
{group, machine_version_3},
{group, machine_version_conversion}
].
@ -34,7 +35,8 @@ all_tests() ->
groups() ->
[
{machine_version_2, [], all_tests()},
{machine_version_3, [], all_tests()}
{machine_version_3, [], all_tests()},
{machine_version_conversion, [], [convert_v2_to_v3]}
].
init_per_suite(Config) ->
@ -46,7 +48,9 @@ end_per_suite(_Config) ->
init_per_group(machine_version_2, Config) ->
[{machine_version, 2} | Config];
init_per_group(machine_version_3, Config) ->
[{machine_version, 3} | Config].
[{machine_version, 3} | Config];
init_per_group(machine_version_conversion, Config) ->
Config.
end_per_group(_Group, _Config) ->
ok.
@ -1715,6 +1719,29 @@ machine_version_waiting_consumer_test(C) ->
?assertEqual(1, priority_queue:len(S)),
ok.
convert_v2_to_v3(Config) ->
ConfigV2 = [{machine_version, 2} | Config],
ConfigV3 = [{machine_version, 3} | Config],
Cid1 = {ctag1, self()},
Cid2 = {ctag2, self()},
MaxCredits = 20,
Entries = [{1, rabbit_fifo:make_checkout(Cid1, {auto, 10, credited}, #{})},
{2, rabbit_fifo:make_checkout(Cid2, {auto, MaxCredits, simple_prefetch},
#{prefetch => MaxCredits})}],
%% run log in v2
{State, _} = run_log(ConfigV2, test_init(?FUNCTION_NAME), Entries),
%% convert from v2 to v3
{#rabbit_fifo{consumers = Consumers}, ok, _} =
apply(meta(ConfigV3, 3), {machine_version, 2, 3}, State),
?assertEqual(2, maps:size(Consumers)),
?assertMatch(#consumer{cfg = #consumer_cfg{credit_mode = {simple_prefetch, MaxCredits}}},
maps:get(Cid2, Consumers)),
ok.
queue_ttl_test(C) ->
QName = rabbit_misc:r(<<"/">>, queue, <<"test">>),
Conf = #{name => ?FUNCTION_NAME,

View File

@ -61,11 +61,11 @@ all_tests() ->
scenario30,
scenario31,
scenario32,
v2_v3,
upgrade,
upgrade_snapshots,
upgrade_snapshots_scenario1,
upgrade_snapshots_scenario2,
upgrade_snapshots_v2_to_v3,
messages_total,
simple_prefetch,
simple_prefetch_without_checkout_cancel,
@ -923,33 +923,6 @@ single_active(_Config) ->
end)
end, [], Size).
v2_v3(_Config) ->
Size = 700,
run_proper(
fun () ->
?FORALL({Length, Bytes, DeliveryLimit, InMemoryLength, SingleActive},
frequency([{5, {undefined, undefined, undefined, undefined, false}},
{5, {oneof([range(1, 10), undefined]),
oneof([range(1, 1000), undefined]),
oneof([range(1, 3), undefined]),
oneof([range(1, 10), 0, undefined]),
oneof([true, false])
}}]),
begin
Config = config(?FUNCTION_NAME,
Length,
Bytes,
SingleActive,
DeliveryLimit,
InMemoryLength,
undefined
),
?FORALL(O, ?LET(Ops, log_gen_v2_v3(Size), expand(Ops, Config)),
collect({log_size, length(O)},
v2_v3_prop(Config, O)))
end)
end, [], Size).
upgrade(_Config) ->
Size = 500,
run_proper(
@ -1008,6 +981,32 @@ upgrade_snapshots(_Config) ->
end)
end, [], Size).
upgrade_snapshots_v2_to_v3(_Config) ->
Size = 500,
run_proper(
fun () ->
?FORALL({Length, Bytes, DeliveryLimit, SingleActive},
frequency([{5, {undefined, undefined, undefined, false}},
{5, {oneof([range(1, 10), undefined]),
oneof([range(1, 1000), undefined]),
oneof([range(1, 3), undefined]),
oneof([true, false])
}}]),
begin
Config = config(?FUNCTION_NAME,
Length,
Bytes,
SingleActive,
DeliveryLimit,
undefined,
undefined
),
?FORALL(O, ?LET(Ops, log_gen_upgrade_snapshots_v2_to_v3(Size), expand(Ops, Config)),
collect({log_size, length(O)},
upgrade_snapshots_prop_v2_to_v3(Config, O)))
end)
end, [], Size).
messages_total(_Config) ->
Size = 1000,
run_proper(
@ -1653,11 +1652,10 @@ simple_prefetch_invariant(WithCheckoutCancel) ->
maps:fold(
fun(_, _, false) ->
false;
(Id, #consumer{cfg = #consumer_cfg{meta = #{prefetch := Prefetch},
credit_mode = simple_prefetch},
(Id, #consumer{cfg = #consumer_cfg{credit_mode = {simple_prefetch, MaxCredit}},
checked_out = CheckedOut,
credit = Credit}, true) ->
valid_simple_prefetch(Prefetch, Credit, maps:size(CheckedOut), WithCheckoutCancel, Id)
valid_simple_prefetch(MaxCredit, Credit, maps:size(CheckedOut), WithCheckoutCancel, Id)
end, true, Consumers)
end.
@ -1683,37 +1681,6 @@ valid_simple_prefetch(Prefetch, _, CheckedOut, false, CId)
valid_simple_prefetch(_, _, _, _, _) ->
true.
v2_v3_prop(Conf0, Commands) ->
Conf = Conf0#{release_cursor_interval => 0},
Indexes = lists:seq(1, length(Commands)),
Entries = lists:zip(Indexes, Commands),
InitState = test_init(Conf),
%% run log v2
{V2, V2Effs} = run_log(InitState, Entries, fun (_) -> true end,
rabbit_fifo, 2),
%% run log v3
{V3, V3Effs} = run_log(InitState, Entries, fun (_) -> true end,
rabbit_fifo, 3),
%% We expect machine versions v2 and v3 to be exactly the same
%% when no "return", "down", or "cancel consumer" Ra commands are used.
case V2 =:= V3 of
true ->
ok;
false ->
ct:pal("v2_v3_prop failed~nExpected:~n~p~nGot:~n~p",
[V2, V3]),
?assertEqual(V2, V3)
end,
case V2Effs =:= V3Effs of
true ->
ok;
false ->
ct:pal("v2_v3_prop failed~nExpected:~n~p~nGot:~n~p",
[V2Effs, V3Effs]),
?assertEqual(V2Effs, V3Effs)
end,
true.
upgrade_prop(Conf0, Commands) ->
Conf = Conf0#{release_cursor_interval => 0},
Indexes = lists:seq(1, length(Commands)),
@ -1826,6 +1793,16 @@ upgrade_snapshots_prop(Conf, Commands) ->
false
end.
upgrade_snapshots_prop_v2_to_v3(Conf, Commands) ->
try run_upgrade_snapshot_test_v2_to_v3(Conf, Commands) of
_ -> true
catch
Err ->
ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]),
ct:pal("Err: ~p~n", [Err]),
false
end.
log_gen(Size) ->
Nodes = [node(),
fakenode@fake,
@ -1852,7 +1829,7 @@ log_gen(Size) ->
%% Does not use "return", "down", or "checkout cancel" Ra commands
%% since these 3 commands change behaviour across v2 and v3 fixing
%% a bug where to many credits are granted to the consumer.
log_gen_v2_v3(Size) ->
log_gen_upgrade_snapshots_v2_to_v3(Size) ->
Nodes = [node(),
fakenode@fake,
fakenode@fake2
@ -2351,6 +2328,53 @@ run_upgrade_snapshot_test(Conf, Commands) ->
end || {release_cursor, SnapIdx, SnapState} <- Cursors],
ok.
run_upgrade_snapshot_test_v2_to_v3(Conf, Commands) ->
ct:pal("running test with ~b commands using config ~p",
[length(Commands), Conf]),
Indexes = lists:seq(1, length(Commands)),
Entries = lists:zip(Indexes, Commands),
Invariant = fun(_) -> true end,
%% Run the whole command log in v2 to emit release cursors.
{_, Effects} = run_log(test_init(Conf), Entries, Invariant, rabbit_fifo, 2),
Cursors = [ C || {release_cursor, _, _} = C <- Effects],
[begin
%% Drop all entries below and including the snapshot.
FilteredV2 = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
(_) -> false
end, Entries),
%% For V3 we will apply the same commands to the snapshot state as for V2.
%% However, we need to increment all Raft indexes by 1 because V3
%% requires one additional Raft index for the conversion command from V2 to V3.
FilteredV3 = lists:keymap(fun(Idx) -> Idx + 1 end, 1, FilteredV2),
%% Recover in V2.
{StateV2, _} = run_log(SnapState, FilteredV2, Invariant, rabbit_fifo, 2),
%% Perform conversion and recover in V3.
Res = rabbit_fifo:apply(meta(SnapIdx + 1), {machine_version, 2, 3}, SnapState),
#rabbit_fifo{} = V3 = element(1, Res),
{StateV3, _} = run_log(V3, FilteredV3, Invariant, rabbit_fifo, 3),
%% Invariant: Recovering a V2 snapshot in V2 or V3 should end up in the same
%% number of messages given that no "return", "down", or "cancel consumer"
%% Ra commands are used.
Fields = [num_messages,
num_ready_messages,
num_enqueuers,
num_consumers,
enqueue_message_bytes,
checkout_message_bytes
],
V2Overview = maps:with(Fields, rabbit_fifo:overview(StateV2)),
V3Overview = maps:with(Fields, rabbit_fifo:overview(StateV3)),
case V2Overview == V3Overview of
true -> ok;
false ->
ct:pal("property failed, expected:~n~p~ngot:~n~p~nstate v2:~n~p~nstate v3:~n~p~n"
"snapshot index: ~p",
[V2Overview, V3Overview, StateV2, ?record_info(rabbit_fifo, StateV3), SnapIdx]),
?assertEqual(V2Overview, V3Overview)
end
end || {release_cursor, SnapIdx, SnapState} <- Cursors],
ok.
hd_or([H | _]) -> H;
hd_or(_) -> {undefined}.