Merge pull request #14621 from rabbitmq/mergify/bp/v4.1.x/pr-14605

Quorum queues: fix resend issues after network partition (backport #14589) (backport #14605)
This commit is contained in:
Michael Klishin 2025-09-26 13:22:01 -04:00 committed by GitHub
commit 2f47fcb227
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 89 additions and 20 deletions

View File

@ -629,6 +629,14 @@ apply(#{machine_version := Vsn} = Meta,
E#enqueuer{status = up};
(_, E) -> E
end, Enqs0),
%% send leader change events to all disconnected enqueuers to prompt them
%% to resend any messages stuck during disconnection,
%% ofc it may not be a leader change per se
Effects0 = maps:fold(fun(P, _E, Acc) when node(P) =:= Node ->
[{send_msg, P, leader_change, ra_event} | Acc];
(_, _E, Acc) -> Acc
end, Monitors, Enqs0),
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
%% mark all consumers as up
{State1, Effects1} =
@ -643,7 +651,7 @@ apply(#{machine_version := Vsn} = Meta,
SAcc), EAcc1};
(_, _, Acc) ->
Acc
end, {State0, Monitors}, Cons0, Vsn),
end, {State0, Effects0}, Cons0, Vsn),
Waiting = update_waiting_consumer_status(Node, State1, up),
State2 = State1#?STATE{enqueuers = Enqs1,
waiting_consumers = Waiting},

View File

@ -160,7 +160,7 @@ enqueue(_QName, _Correlation, _Msg,
#state{queue_status = reject_publish,
cfg = #cfg{}} = State) ->
{reject_publish, State};
enqueue(QName, Correlation, Msg,
enqueue(_QName, Correlation, Msg,
#state{slow = WasSlow,
pending = Pending,
queue_status = go,
@ -176,8 +176,9 @@ enqueue(QName, Correlation, Msg,
next_seq = Seq + 1,
next_enqueue_seq = EnqueueSeq + 1,
slow = IsSlow},
if IsSlow andalso not WasSlow ->
{ok, set_timer(QName, State), [{block, cluster_name(State)}]};
{ok, State, [{block, cluster_name(State)}]};
true ->
{ok, State, []}
end.
@ -632,10 +633,10 @@ handle_ra_event(QName, Leader, {applied, Seqs},
when ActualLeader =/= OldLeader ->
%% there is a new leader
?LOG_DEBUG("~ts: Detected QQ leader change (applied) "
"from ~w to ~w, "
"resending ~b pending commands",
[?MODULE, OldLeader, ActualLeader,
maps:size(State1#state.pending)]),
"from ~w to ~w, "
"resending ~b pending commands",
[?MODULE, OldLeader, ActualLeader,
maps:size(State1#state.pending)]),
resend_all_pending(State1#state{leader = ActualLeader});
_ ->
State1
@ -702,9 +703,9 @@ handle_ra_event(QName, Leader, {machine, leader_change},
%% we need to update leader
%% and resend any pending commands
?LOG_DEBUG("~ts: ~s Detected QQ leader change from ~w to ~w, "
"resending ~b pending commands",
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
Leader, maps:size(Pending)]),
"resending ~b pending commands",
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
Leader, maps:size(Pending)]),
State = resend_all_pending(State0#state{leader = Leader}),
{ok, State, []};
handle_ra_event(_QName, _From, {rejected, {not_leader, Leader, _Seq}},
@ -714,21 +715,27 @@ handle_ra_event(QName, _From, {rejected, {not_leader, Leader, _Seq}},
#state{leader = OldLeader,
pending = Pending} = State0) ->
?LOG_DEBUG("~ts: ~s Detected QQ leader change (rejection) from ~w to ~w, "
"resending ~b pending commands",
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
Leader, maps:size(Pending)]),
"resending ~b pending commands",
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
Leader, maps:size(Pending)]),
State = resend_all_pending(State0#state{leader = Leader}),
{ok, cancel_timer(State), []};
handle_ra_event(_QName, _From,
{rejected, {not_leader, _UndefinedMaybe, _Seq}}, State0) ->
% TODO: how should these be handled? re-sent on timer or try random
{ok, State0, []};
handle_ra_event(QName, _, timeout, #state{cfg = #cfg{servers = Servers}} = State0) ->
handle_ra_event(QName, _, timeout, #state{cfg = #cfg{servers = Servers},
leader = OldLeader,
pending = Pending} = State0) ->
case find_leader(Servers) of
undefined ->
%% still no leader, set the timer again
{ok, set_timer(QName, State0), []};
Leader ->
?LOG_DEBUG("~ts: ~s Pending applied Timeout ~w to ~w, "
"resending ~b pending commands",
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
Leader, maps:size(Pending)]),
State = resend_all_pending(State0#state{leader = Leader}),
{ok, State, []}
end;
@ -743,7 +750,7 @@ handle_ra_event(QName, Leader, close_cached_segments,
case now_ms() > Last + ?CACHE_SEG_TIMEOUT of
true ->
?LOG_DEBUG("~ts: closing_cached_segments",
[rabbit_misc:rs(QName)]),
[rabbit_misc:rs(QName)]),
%% its been long enough, evict all
_ = ra_flru:evict_all(Cache),
State#state{cached_segments = undefined};
@ -804,12 +811,16 @@ seq_applied({Seq, Response},
{Corrs, Actions0, #state{} = State0}) ->
%% sequences aren't guaranteed to be applied in order as enqueues are
%% low priority commands and may be overtaken by others with a normal priority.
%%
%% if the response is 'not_enqueued' we need to still keep the pending
%% command for a later resend
{Actions, State} = maybe_add_action(Response, Actions0, State0),
case maps:take(Seq, State#state.pending) of
{{undefined, _}, Pending} ->
{{undefined, _}, Pending}
when Response =/= not_enqueued ->
{Corrs, Actions, State#state{pending = Pending}};
{{Corr, _}, Pending}
when Response /= not_enqueued ->
when Response =/= not_enqueued ->
{[Corr | Corrs], Actions, State#state{pending = Pending}};
_ ->
{Corrs, Actions, State}

View File

@ -121,6 +121,7 @@ groups() ->
]},
{clustered_with_partitions, [],
[
partitioned_publisher,
reconnect_consumer_and_publish,
reconnect_consumer_and_wait,
reconnect_consumer_and_wait_channel_down
@ -285,7 +286,8 @@ end_per_group(_, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish;
init_per_testcase(Testcase, Config) when Testcase == partitioned_publisher;
Testcase == reconnect_consumer_and_publish;
Testcase == reconnect_consumer_and_wait;
Testcase == reconnect_consumer_and_wait_channel_down ->
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
@ -377,7 +379,8 @@ merge_app_env(Config) ->
{rabbit, [{core_metrics_gc_interval, 100}]}),
{ra, [{min_wal_roll_over_interval, 30000}]}).
end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish;
end_per_testcase(Testcase, Config) when Testcase == partitioned_publisher;
Testcase == reconnect_consumer_and_publish;
Testcase == reconnect_consumer_and_wait;
Testcase == reconnect_consumer_and_wait_channel_down ->
Config1 = rabbit_ct_helpers:run_steps(Config,
@ -3011,6 +3014,51 @@ cleanup_data_dir(Config) ->
?awaitMatch(false, filelib:is_dir(DataDir2), 30000),
ok.
partitioned_publisher(Config) ->
[Node0, Node1, Node2] = Nodes =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch0 = rabbit_ct_client_helpers:open_channel(Config, Node0),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Node1),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch1, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
RaName = ra_name(QQ),
{ok, _, {_, Node1}} = ra:members({RaName, Node1}),
#'confirm.select_ok'{} = amqp_channel:call(Ch0, #'confirm.select'{}),
#'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}),
%% first publish with confirm
publish_confirm(Ch0, QQ),
%% then partition
rabbit_ct_broker_helpers:block_traffic_between(Node0, Node1),
rabbit_ct_broker_helpers:block_traffic_between(Node0, Node2),
%% check that we can still publish from another channel that is on the
%% majority side
publish_confirm(Ch1, QQ),
%% publish one from partitioned node that will not go through
publish(Ch0, QQ),
%% wait for disconnections
rabbit_ct_helpers:await_condition(
fun() ->
ConnectedNodes = erpc:call(Node0, erlang, nodes, []),
not lists:member(Node1, ConnectedNodes)
end, 30000),
flush(10),
%% then heal the partition
rabbit_ct_broker_helpers:allow_traffic_between(Node0, Node1),
rabbit_ct_broker_helpers:allow_traffic_between(Node0, Node2),
publish(Ch0, QQ),
wait_for_messages_ready(Nodes, RaName, 4),
ok.
reconnect_consumer_and_publish(Config) ->
[Server | _] = Servers =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

View File

@ -219,7 +219,9 @@ lost_return_is_resent_on_applied_after_leader_change(Config) ->
RaEvt, F5),
%% this should resend the never applied enqueue
{_, _, F7} = process_ra_events(receive_ra_events(1, 0), ClusterName, F6),
?assertEqual(0, rabbit_fifo_client:pending_size(F7)),
{_, _, F8} = process_ra_events(receive_ra_events(1, 0), ClusterName, F7),
?assertEqual(0, rabbit_fifo_client:pending_size(F8)),
flush(),
ok.