rabbit_khepri: Retry fence in init/1 in cases of timeout

This commit is contained in:
Michael Davis 2024-09-11 13:26:36 -04:00
parent edd8fbcb5b
commit 3afb379f0e
No known key found for this signature in database
1 changed files with 25 additions and 5 deletions

View File

@ -289,6 +289,12 @@ retry_timeout() ->
undefined -> 30000
end.
retry_limit() ->
case application:get_env(rabbit, khepri_leader_wait_retry_limit) of
{ok, T} -> T;
undefined -> 10
end.
%% @private
-spec init(IsVirgin) -> Ret when
@ -305,11 +311,7 @@ init(IsVirgin) ->
"Found the following metadata store members: ~p", [Members],
#{domain => ?RMQLOG_DOMAIN_DB}),
maybe
?LOG_DEBUG(
"Khepri-based " ?RA_FRIENDLY_NAME " catching up on "
"replication to the Raft cluster leader", [],
#{domain => ?RMQLOG_DOMAIN_DB}),
ok ?= fence(retry_timeout()),
ok ?= await_replication(),
?LOG_DEBUG(
"local Khepri-based " ?RA_FRIENDLY_NAME " member is caught "
"up to the Raft cluster leader", [],
@ -331,6 +333,24 @@ init(IsVirgin) ->
end
end.
await_replication() ->
await_replication(retry_timeout(), retry_limit()).
await_replication(_Timeout, 0) ->
{error, timeout};
await_replication(Timeout, Retries) ->
?LOG_DEBUG(
"Khepri-based " ?RA_FRIENDLY_NAME " waiting to catch up on replication "
"to the Raft cluster leader. Waiting for ~tb ms, ~tb retries left",
[Timeout, Retries],
#{domain => ?RMQLOG_DOMAIN_DB}),
case fence(Timeout) of
ok ->
ok;
{error, timeout} ->
await_replication(Timeout, Retries -1)
end.
%% @private
can_join_cluster(DiscoveryNode) when is_atom(DiscoveryNode) ->