Force checkpoint in all members

This commit is contained in:
Diana Parra Corbacho 2025-05-22 15:52:41 +02:00
parent 7d3292cedd
commit 0c2b6a1cb3
2 changed files with 31 additions and 3 deletions

View File

@ -2119,7 +2119,6 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis
ok.
force_checkpoint_on_queue(QName) ->
Node = node(),
QNameFmt = rabbit_misc:rs(QName),
case rabbit_db_queue:get_durable(QName) of
{ok, Q} when ?amqqueue_is_classic(Q) ->
@ -2127,7 +2126,10 @@ force_checkpoint_on_queue(QName) ->
{ok, Q} when ?amqqueue_is_quorum(Q) ->
{RaName, _} = amqqueue:get_pid(Q),
rabbit_log:debug("Sending command to force ~ts to take a checkpoint", [QNameFmt]),
rpc:call(Node, ra, cast_aux_command, [{RaName, Node}, force_checkpoint], ?FORCE_CHECKPOINT_RPC_TIMEOUT);
Nodes = amqqueue:get_nodes(Q),
_ = [ra:cast_aux_command({RaName, Node}, force_checkpoint)
|| Node <- Nodes],
ok;
{ok, _Q} ->
{error, not_quorum_queue};
{error, _} = E ->

View File

@ -1343,7 +1343,7 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
end || Q <- QQs, VHost <- VHosts].
force_checkpoint_on_queue(Config) ->
[Server0, _Server1, _Server2] =
[Server0, Server1, Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
QQ = ?config(queue_name, Config),
@ -1364,6 +1364,18 @@ force_checkpoint_on_queue(Config) ->
#{log := #{latest_checkpoint_index := LCI}} = State,
LCI =:= undefined
end),
rabbit_ct_helpers:await_condition(
fun() ->
{ok, State, _} = rpc:call(Server1, ra, member_overview, [{RaName, Server1}]),
#{log := #{latest_checkpoint_index := LCI}} = State,
LCI =:= undefined
end),
rabbit_ct_helpers:await_condition(
fun() ->
{ok, State, _} = rpc:call(Server2, ra, member_overview, [{RaName, Server2}]),
#{log := #{latest_checkpoint_index := LCI}} = State,
LCI =:= undefined
end),
{ok, State0, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
ct:pal("Ra server state before forcing a checkpoint: ~tp~n", [State0]),
@ -1380,6 +1392,20 @@ force_checkpoint_on_queue(Config) ->
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
#{log := #{latest_checkpoint_index := LCI}} = State,
(LCI =/= undefined) andalso (LCI >= N)
end),
rabbit_ct_helpers:await_condition(
fun() ->
{ok, State, _} = rpc:call(Server1, ra, member_overview, [{RaName, Server1}]),
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
#{log := #{latest_checkpoint_index := LCI}} = State,
(LCI =/= undefined) andalso (LCI >= N)
end),
rabbit_ct_helpers:await_condition(
fun() ->
{ok, State, _} = rpc:call(Server2, ra, member_overview, [{RaName, Server2}]),
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
#{log := #{latest_checkpoint_index := LCI}} = State,
(LCI =/= undefined) andalso (LCI >= N)
end).
force_checkpoint(Config) ->