From 93f0b7860ecc0434191e3ffbbac283e8aeb562f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 6 Aug 2025 13:39:11 +0000 Subject: [PATCH] Retry stream SAC unregister consumer operation Retry unregistering a stream from its group in case of stream coordinator timeout/unavailability. The operation can fail during or after a network partition, which is normally, but it is harmless to retry it to clean up the SAC group. The operation is idempotent anyway. --- .../src/rabbit_stream_reader.erl | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index bc2bf8d78b..21969915c3 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -4045,9 +4045,10 @@ sac_register_consumer(VH, St, PartitionIndex, Name, Pid, ConnName, SubId) -> end). sac_unregister_consumer(VH, St, Name, Pid, SubId) -> - sac_call(fun() -> - ?SAC_MOD:unregister_consumer(VH, St, Name, Pid, SubId) - end). + Call = fun() -> + ?SAC_MOD:unregister_consumer(VH, St, Name, Pid, SubId) + end, + sac_call(retryable_sac_call(Call)). sac_call(Call) -> case Call() of @@ -4063,3 +4064,16 @@ sac_call(Call) -> R -> R end. + +retryable_sac_call(Call) -> + fun() -> retry_sac_call(Call, 3) end. + +retry_sac_call(_Call, 0) -> + {error, coordinator_unavailable}; +retry_sac_call(Call, N) -> + case Call() of + {error, coordinator_unavailable} -> + retry_sac_call(Call, N - 1); + R -> + R + end.