From 0c2b6a1cb3884ad0959dcc9884152c53a985ac0d Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Thu, 22 May 2025 15:52:41 +0200 Subject: [PATCH] Force checkpoint in all members --- deps/rabbit/src/rabbit_quorum_queue.erl | 6 ++++-- deps/rabbit/test/quorum_queue_SUITE.erl | 28 ++++++++++++++++++++++++- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 08919859e0..9c0e7fd9ca 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -2119,7 +2119,6 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis ok. force_checkpoint_on_queue(QName) -> - Node = node(), QNameFmt = rabbit_misc:rs(QName), case rabbit_db_queue:get_durable(QName) of {ok, Q} when ?amqqueue_is_classic(Q) -> @@ -2127,7 +2126,10 @@ force_checkpoint_on_queue(QName) -> {ok, Q} when ?amqqueue_is_quorum(Q) -> {RaName, _} = amqqueue:get_pid(Q), rabbit_log:debug("Sending command to force ~ts to take a checkpoint", [QNameFmt]), - rpc:call(Node, ra, cast_aux_command, [{RaName, Node}, force_checkpoint], ?FORCE_CHECKPOINT_RPC_TIMEOUT); + Nodes = amqqueue:get_nodes(Q), + _ = [ra:cast_aux_command({RaName, Node}, force_checkpoint) + || Node <- Nodes], + ok; {ok, _Q} -> {error, not_quorum_queue}; {error, _} = E -> diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 0e5304856c..f784d2c44b 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1343,7 +1343,7 @@ force_vhost_queues_shrink_member_to_current_member(Config) -> end || Q <- QQs, VHost <- VHosts]. force_checkpoint_on_queue(Config) -> - [Server0, _Server1, _Server2] = + [Server0, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), QQ = ?config(queue_name, Config), @@ -1364,6 +1364,18 @@ force_checkpoint_on_queue(Config) -> #{log := #{latest_checkpoint_index := LCI}} = State, LCI =:= undefined end), + rabbit_ct_helpers:await_condition( + fun() -> + {ok, State, _} = rpc:call(Server1, ra, member_overview, [{RaName, Server1}]), + #{log := #{latest_checkpoint_index := LCI}} = State, + LCI =:= undefined + end), + rabbit_ct_helpers:await_condition( + fun() -> + {ok, State, _} = rpc:call(Server2, ra, member_overview, [{RaName, Server2}]), + #{log := #{latest_checkpoint_index := LCI}} = State, + LCI =:= undefined + end), {ok, State0, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]), ct:pal("Ra server state before forcing a checkpoint: ~tp~n", [State0]), @@ -1380,6 +1392,20 @@ force_checkpoint_on_queue(Config) -> ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]), #{log := #{latest_checkpoint_index := LCI}} = State, (LCI =/= undefined) andalso (LCI >= N) + end), + rabbit_ct_helpers:await_condition( + fun() -> + {ok, State, _} = rpc:call(Server1, ra, member_overview, [{RaName, Server1}]), + ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]), + #{log := #{latest_checkpoint_index := LCI}} = State, + (LCI =/= undefined) andalso (LCI >= N) + end), + rabbit_ct_helpers:await_condition( + fun() -> + {ok, State, _} = rpc:call(Server2, ra, member_overview, [{RaName, Server2}]), + ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]), + #{log := #{latest_checkpoint_index := LCI}} = State, + (LCI =/= undefined) andalso (LCI >= N) end). force_checkpoint(Config) ->