Make map operations deterministic in quorum queues

Prior to this commit map iteration order was undefined in quorum queues
and could therefore be different on different versions of Erlang/OTP.

Example:

OTP 26.2.5.3
```
Erlang/OTP 26 [erts-14.2.5.3] [source] [64-bit] [smp:12:12] [ds:12:12:10] [async-threads:1] [jit]

Eshell V14.2.5.3 (press Ctrl+G to abort, type help(). for help)
1> maps:foreach(fun(K, _) -> io:format("~b,", [K]) end, maps:from_keys(lists:seq(1, 33), ok)).
4,25,8,1,23,10,7,9,11,12,28,24,13,3,18,29,26,22,19,2,33,21,32,20,17,30,14,5,6,27,16,31,15,ok
```

OTP 27.3.3
```
Erlang/OTP 27 [erts-15.2.6] [source] [64-bit] [smp:12:12] [ds:12:12:10] [async-threads:1] [jit]

Eshell V15.2.6 (press Ctrl+G to abort, type help(). for help)
1> maps:foreach(fun(K, _) -> io:format("~b,", [K]) end, maps:from_keys(lists:seq(1, 33), ok)).
18,4,12,19,29,13,2,7,31,8,10,23,9,15,32,1,25,28,20,6,11,17,24,14,33,3,16,30,21,5,27,26,22,ok
```

This can lead to non-determinism on different members. For example, different
members could potentially return messages in a different order.

This commit introduces a new machine version fixing this bug.
This commit is contained in:
David Ansari 2025-05-28 17:53:38 +02:00 committed by David Ansari
parent f293c11a04
commit 2db48432d9
3 changed files with 123 additions and 61 deletions

View File

@ -514,7 +514,8 @@ apply(#{index := _Idx}, #garbage_collection{}, State) ->
{State, ok, [{aux, garbage_collection}]};
apply(Meta, {timeout, expire_msgs}, State) ->
checkout(Meta, State, State, []);
apply(#{system_time := Ts} = Meta,
apply(#{machine_version := Vsn,
system_time := Ts} = Meta,
{down, Pid, noconnection},
#?STATE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
@ -524,7 +525,7 @@ apply(#{system_time := Ts} = Meta,
%% if the pid refers to an active or cancelled consumer,
%% mark it as suspected and return it to the waiting queue
{State1, Effects0} =
maps:fold(
rabbit_fifo_maps:fold(
fun(CKey, ?CONSUMER_PID(P) = C0, {S0, E0})
when node(P) =:= Node ->
%% the consumer should be returned to waiting
@ -546,7 +547,7 @@ apply(#{system_time := Ts} = Meta,
Effs1};
(_, _, S) ->
S
end, {State0, []}, Cons0),
end, {State0, []}, Cons0, Vsn),
WaitingConsumers = update_waiting_consumer_status(Node, State1,
suspected_down),
@ -561,7 +562,8 @@ apply(#{system_time := Ts} = Meta,
end, Enqs0),
Effects = [{monitor, node, Node} | Effects1],
checkout(Meta, State0, State#?STATE{enqueuers = Enqs}, Effects);
apply(#{system_time := Ts} = Meta,
apply(#{machine_version := Vsn,
system_time := Ts} = Meta,
{down, Pid, noconnection},
#?STATE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
@ -576,7 +578,7 @@ apply(#{system_time := Ts} = Meta,
Node = node(Pid),
{State, Effects1} =
maps:fold(
rabbit_fifo_maps:fold(
fun(CKey, #consumer{cfg = #consumer_cfg{pid = P},
status = up} = C0,
{St0, Eff}) when node(P) =:= Node ->
@ -587,7 +589,7 @@ apply(#{system_time := Ts} = Meta,
{St, Eff1};
(_, _, {St, Eff}) ->
{St, Eff}
end, {State0, []}, Cons0),
end, {State0, []}, Cons0, Vsn),
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{status = suspected_down};
(_, E) -> E
@ -603,15 +605,17 @@ apply(#{system_time := Ts} = Meta,
apply(Meta, {down, Pid, _Info}, State0) ->
{State1, Effects1} = activate_next_consumer(handle_down(Meta, Pid, State0)),
checkout(Meta, State0, State1, Effects1);
apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0,
enqueuers = Enqs0,
service_queue = _SQ0} = State0) ->
apply(#{machine_version := Vsn} = Meta,
{nodeup, Node},
#?STATE{consumers = Cons0,
enqueuers = Enqs0,
service_queue = _SQ0} = State0) ->
%% A node we are monitoring has come back.
%% If we have suspected any processes of being
%% down we should now re-issue the monitors for them to detect if they're
%% actually down or not
Monitors = [{monitor, process, P}
|| P <- suspected_pids_for(Node, State0)],
|| P <- suspected_pids_for(Node, Vsn, State0)],
Enqs1 = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{status = up};
@ -620,17 +624,18 @@ apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0,
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
%% mark all consumers as up
{State1, Effects1} =
maps:fold(fun(ConsumerKey, ?CONSUMER_PID(P) = C, {SAcc, EAcc})
when (node(P) =:= Node) and
(C#consumer.status =/= cancelled) ->
EAcc1 = ConsumerUpdateActiveFun(SAcc, ConsumerKey,
C, true, up, EAcc),
{update_or_remove_con(Meta, ConsumerKey,
C#consumer{status = up},
SAcc), EAcc1};
(_, _, Acc) ->
Acc
end, {State0, Monitors}, Cons0),
rabbit_fifo_maps:fold(
fun(ConsumerKey, ?CONSUMER_PID(P) = C, {SAcc, EAcc})
when (node(P) =:= Node) and
(C#consumer.status =/= cancelled) ->
EAcc1 = ConsumerUpdateActiveFun(SAcc, ConsumerKey,
C, true, up, EAcc),
{update_or_remove_con(Meta, ConsumerKey,
C#consumer{status = up},
SAcc), EAcc1};
(_, _, Acc) ->
Acc
end, {State0, Monitors}, Cons0, Vsn),
Waiting = update_waiting_consumer_status(Node, State1, up),
State2 = State1#?STATE{enqueuers = Enqs1,
waiting_consumers = Waiting},
@ -708,27 +713,29 @@ convert_v3_to_v4(#{} = _Meta, StateV3) ->
msg_cache = rabbit_fifo_v3:get_field(msg_cache, StateV3),
unused_1 = []}.
purge_node(Meta, Node, State, Effects) ->
purge_node(#{machine_version := Vsn} = Meta, Node, State, Effects) ->
lists:foldl(fun(Pid, {S0, E0}) ->
{S, E} = handle_down(Meta, Pid, S0),
{S, E0 ++ E}
end, {State, Effects},
all_pids_for(Node, State)).
all_pids_for(Node, Vsn, State)).
%% any downs that are not noconnection
handle_down(Meta, Pid, #?STATE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
handle_down(#{machine_version := Vsn} = Meta,
Pid, #?STATE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the down pid
State1 = State0#?STATE{enqueuers = maps:remove(Pid, Enqs0)},
{Effects1, State2} = handle_waiting_consumer_down(Pid, State1),
% return checked out messages to main queue
% Find the consumers for the down pid
DownConsumers = maps:keys(maps:filter(fun(_CKey, ?CONSUMER_PID(P)) ->
P =:= Pid
end, Cons0)),
DownConsumers = maps:filter(fun(_CKey, ?CONSUMER_PID(P)) ->
P =:= Pid
end, Cons0),
DownConsumerKeys = rabbit_fifo_maps:keys(DownConsumers, Vsn),
lists:foldl(fun(ConsumerKey, {S, E}) ->
cancel_consumer(Meta, ConsumerKey, S, E, down)
end, {State2, Effects1}, DownConsumers).
end, {State2, Effects1}, DownConsumerKeys).
consumer_active_flag_update_function(
#?STATE{cfg = #cfg{consumer_strategy = competing}}) ->
@ -916,14 +923,15 @@ get_checked_out(CKey, From, To, #?STATE{consumers = Consumers}) ->
end.
-spec version() -> pos_integer().
version() -> 5.
version() -> 6.
which_module(0) -> rabbit_fifo_v0;
which_module(1) -> rabbit_fifo_v1;
which_module(2) -> rabbit_fifo_v3;
which_module(3) -> rabbit_fifo_v3;
which_module(4) -> ?MODULE;
which_module(5) -> ?MODULE.
which_module(5) -> ?MODULE;
which_module(6) -> ?MODULE.
-define(AUX, aux_v3).
@ -2692,41 +2700,45 @@ all_nodes(#?STATE{consumers = Cons0,
Acc#{node(P) => ok}
end, Nodes1, WaitingConsumers0)).
all_pids_for(Node, #?STATE{consumers = Cons0,
enqueuers = Enqs0,
waiting_consumers = WaitingConsumers0}) ->
Cons = maps:fold(fun(_, ?CONSUMER_PID(P), Acc)
when node(P) =:= Node ->
[P | Acc];
(_, _, Acc) -> Acc
end, [], Cons0),
Enqs = maps:fold(fun(P, _, Acc)
when node(P) =:= Node ->
[P | Acc];
(_, _, Acc) -> Acc
end, Cons, Enqs0),
all_pids_for(Node, Vsn, #?STATE{consumers = Cons0,
enqueuers = Enqs0,
waiting_consumers = WaitingConsumers0}) ->
Cons = rabbit_fifo_maps:fold(fun(_, ?CONSUMER_PID(P), Acc)
when node(P) =:= Node ->
[P | Acc];
(_, _, Acc) ->
Acc
end, [], Cons0, Vsn),
Enqs = rabbit_fifo_maps:fold(fun(P, _, Acc)
when node(P) =:= Node ->
[P | Acc];
(_, _, Acc) ->
Acc
end, Cons, Enqs0, Vsn),
lists:foldl(fun({_, ?CONSUMER_PID(P)}, Acc)
when node(P) =:= Node ->
[P | Acc];
(_, Acc) -> Acc
end, Enqs, WaitingConsumers0).
suspected_pids_for(Node, #?STATE{consumers = Cons0,
enqueuers = Enqs0,
waiting_consumers = WaitingConsumers0}) ->
Cons = maps:fold(fun(_Key,
#consumer{cfg = #consumer_cfg{pid = P},
status = suspected_down},
Acc)
when node(P) =:= Node ->
[P | Acc];
(_, _, Acc) -> Acc
end, [], Cons0),
Enqs = maps:fold(fun(P, #enqueuer{status = suspected_down}, Acc)
when node(P) =:= Node ->
[P | Acc];
(_, _, Acc) -> Acc
end, Cons, Enqs0),
suspected_pids_for(Node, Vsn, #?STATE{consumers = Cons0,
enqueuers = Enqs0,
waiting_consumers = WaitingConsumers0}) ->
Cons = rabbit_fifo_maps:fold(fun(_Key,
#consumer{cfg = #consumer_cfg{pid = P},
status = suspected_down},
Acc)
when node(P) =:= Node ->
[P | Acc];
(_, _, Acc) ->
Acc
end, [], Cons0, Vsn),
Enqs = rabbit_fifo_maps:fold(fun(P, #enqueuer{status = suspected_down}, Acc)
when node(P) =:= Node ->
[P | Acc];
(_, _, Acc) ->
Acc
end, Cons, Enqs0, Vsn),
lists:foldl(fun({_Key,
#consumer{cfg = #consumer_cfg{pid = P},
status = suspected_down}}, Acc)
@ -2783,7 +2795,10 @@ convert(Meta, 3, To, State) ->
convert(Meta, 4, To, convert_v3_to_v4(Meta, State));
convert(Meta, 4, To, State) ->
%% no conversion needed, this version only includes a logic change
convert(Meta, 5, To, State).
convert(Meta, 5, To, State);
convert(Meta, 5, To, State) ->
%% no conversion needed, this version only includes a logic change
convert(Meta, 6, To, State).
smallest_raft_index(#?STATE{messages = Messages,
ra_indexes = Indexes,

View File

@ -1,3 +1,9 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
-module(rabbit_fifo_index).
-export([

41
deps/rabbit/src/rabbit_fifo_maps.erl vendored Normal file
View File

@ -0,0 +1,41 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%% Deterministic map operations.
-module(rabbit_fifo_maps).
-export([keys/2,
fold/4]).
-spec keys(Map, ra_machine:version()) -> Keys when
Map :: #{Key => _},
Keys :: [Key].
keys(Map, Vsn) ->
Keys = maps:keys(Map),
case is_deterministic(Vsn) of
true ->
lists:sort(Keys);
false ->
Keys
end.
-spec fold(Fun, Init, Map, ra_machine:version()) -> Acc when
Fun :: fun((Key, Value, AccIn) -> AccOut),
Init :: term(),
Acc :: AccOut,
AccIn :: Init | AccOut,
Map :: #{Key => Value}.
fold(Fun, Init, Map, Vsn) ->
Iterable = case is_deterministic(Vsn) of
true ->
maps:iterator(Map, ordered);
false ->
Map
end,
maps:fold(Fun, Init, Iterable).
is_deterministic(Vsn) when is_integer(Vsn) ->
Vsn > 5.