Close stream connection in case of unexpected error from SAC coordinator

Calls to the stream SAC coordinator can fail for various reason
(e.g. a timeout because of a network partition). The stream reader does not
take into account what the SAC coordinator returns and moves on even
in case of errors. This can lead to inconsistent state for SAC groups.

This commit changes this behavior by handling unexpected errors from the
SAC coordinator and closing the connection. The client is expected to
reconnect. This is safer than risking inconsistent state.

Fixes #14040
This commit is contained in:
Arnaud Cogoluègnes 2025-06-06 11:17:51 +02:00
parent a9cf049030
commit 58f4e83c22
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
2 changed files with 77 additions and 49 deletions

View File

@ -27,6 +27,8 @@
-opaque state() :: #?MODULE{}.
-type sac_error() :: partition_index_conflict | not_found.
-export_type([state/0,
command/0]).
@ -50,7 +52,8 @@
import_state/2,
check_conf_change/1,
list_nodes/1,
state_enter/2
state_enter/2,
is_sac_error/1
]).
-export([make_purge_nodes/1,
make_update_conf/1]).
@ -89,7 +92,7 @@
pid(),
binary(),
integer()) ->
{ok, boolean()} | {error, term()}.
{ok, boolean()} | {error, sac_error() | term()}.
register_consumer(VirtualHost,
Stream,
PartitionIndex,
@ -110,7 +113,7 @@ register_consumer(VirtualHost,
binary(),
pid(),
integer()) ->
ok | {error, term()}.
ok | {error, sac_error() | term()}.
unregister_consumer(VirtualHost,
Stream,
ConsumerName,
@ -122,13 +125,15 @@ unregister_consumer(VirtualHost,
connection_pid = ConnectionPid,
subscription_id = SubscriptionId}).
-spec activate_consumer(binary(), binary(), binary()) -> ok.
-spec activate_consumer(binary(), binary(), binary()) ->
ok | {error, sac_error() | term()}.
activate_consumer(VH, Stream, Name) ->
process_command(#command_activate_consumer{vhost =VH,
stream = Stream,
consumer_name= Name}).
-spec connection_reconnected(connection_pid()) -> ok.
-spec connection_reconnected(connection_pid()) ->
ok | {error, sac_error() | term()}.
connection_reconnected(Pid) ->
process_command(#command_connection_reconnected{pid = Pid}).
@ -150,7 +155,7 @@ wrap_cmd(Cmd) ->
%% (CLI command)
-spec consumer_groups(binary(), [atom()]) ->
{ok,
[term()] | {error, atom()}}.
[term()]} | {error, sac_error() | term()}.
consumer_groups(VirtualHost, InfoKeys) ->
case ra_local_query(fun(State) ->
SacState =
@ -172,7 +177,7 @@ consumer_groups(VirtualHost, InfoKeys) ->
%% (CLI command)
-spec group_consumers(binary(), binary(), binary(), [atom()]) ->
{ok, [term()]} |
{error, atom()}.
{error, sac_error() | term()}.
group_consumers(VirtualHost, Stream, Reference, InfoKeys) ->
case ra_local_query(fun(State) ->
SacState =
@ -932,6 +937,10 @@ state_enter(leader, #?MODULE{groups = Groups} = State)
state_enter(_, _) ->
[].
-spec is_sac_error(term()) -> boolean().
is_sac_error(Reason) ->
lists:member(Reason, ?SAC_ERRORS).
nodes_from_group(#group{consumers = Cs}) when is_list(Cs) ->
lists:foldl(fun(#consumer{pid = Pid}, Acc) ->
Acc#{node(Pid) => true}

View File

@ -81,6 +81,7 @@
-define(UNKNOWN_FIELD, unknown_field).
-define(SILENT_CLOSE_DELAY, 3_000).
-define(IS_INVALID_REF(Ref), is_binary(Ref) andalso byte_size(Ref) > 255).
-define(SAC_MOD, rabbit_stream_sac_coordinator).
-import(rabbit_stream_utils, [check_write_permitted/2,
check_read_permitted/3]).
@ -722,7 +723,7 @@ open(info, {OK, S, Data},
connection_state = State2}}
end;
open(info, {sac, check_connection, _}, State) ->
rabbit_stream_sac_coordinator:connection_reconnected(self()),
_ = sac_connection_reconnected(self()),
{keep_state, State};
open(info,
{sac, #{subscription_id := SubId,
@ -794,17 +795,15 @@ open(info,
rabbit_log:debug("Subscription ~tp on ~tp has been deleted.",
[SubId, Stream]),
rabbit_log:debug("Active ~tp, message ~tp", [Active, Msg]),
case {Active, Msg} of
{false, #{stepping_down := true,
stream := St,
consumer_name := ConsumerName}} ->
rabbit_log:debug("Former active consumer gone, activating consumer " ++
"on stream ~tp, group ~tp", [St, ConsumerName]),
_ = rabbit_stream_sac_coordinator:activate_consumer(VirtualHost,
St,
ConsumerName);
_ ->
ok
_ = case {Active, Msg} of
{false, #{stepping_down := true,
stream := St,
consumer_name := ConsumerName}} ->
rabbit_log:debug("Former active consumer gone, activating consumer " ++
"on stream ~tp, group ~tp", [St, ConsumerName]),
sac_activate_consumer(VirtualHost, St, ConsumerName);
_ ->
ok
end,
{Connection0, ConnState0}
end,
@ -2554,9 +2553,8 @@ handle_frame_post_auth(Transport,
rabbit_log:debug("Subscription ~tp on stream ~tp, group ~tp " ++
"has stepped down, activating consumer",
[SubscriptionId, Stream, ConsumerName]),
_ = rabbit_stream_sac_coordinator:activate_consumer(VirtualHost,
Stream,
ConsumerName),
_ = sac_activate_consumer(VirtualHost, Stream,
ConsumerName),
ok;
_ ->
ok
@ -3015,21 +3013,9 @@ handle_subscription(Transport,#stream_connection{
maybe_register_consumer(_, _, _, _, _, _, false = _Sac) ->
{ok, true};
maybe_register_consumer(VirtualHost,
Stream,
ConsumerName,
ConnectionName,
SubscriptionId,
Properties,
true) ->
PartitionIndex = partition_index(VirtualHost, Stream, Properties),
rabbit_stream_sac_coordinator:register_consumer(VirtualHost,
Stream,
PartitionIndex,
ConsumerName,
self(),
ConnectionName,
SubscriptionId).
maybe_register_consumer(VH, St, Name, ConnName, SubId, Properties, true) ->
PartitionIndex = partition_index(VH, St, Properties),
sac_register_consumer(VH, St, PartitionIndex, Name, self(), ConnName, SubId).
maybe_send_consumer_update(Transport,
Connection = #stream_connection{
@ -3175,13 +3161,12 @@ maybe_unregister_consumer(VirtualHost,
ConsumerName = consumer_name(Properties),
Requests1 = maps:fold(
fun(_, #request{content =
#{active := false,
subscription_id := SubId,
stepping_down := true}}, Acc) when SubId =:= SubscriptionId ->
_ = rabbit_stream_sac_coordinator:activate_consumer(VirtualHost,
Stream,
ConsumerName),
fun(_, #request{content = #{active := false,
subscription_id := SubId,
stepping_down := true}}, Acc)
when SubId =:= SubscriptionId ->
_ = sac_activate_consumer(VirtualHost, Stream,
ConsumerName),
rabbit_log:debug("Outstanding SAC activation request for stream '~tp', " ++
"group '~tp', sending activation.",
[Stream, ConsumerName]),
@ -3190,11 +3175,8 @@ maybe_unregister_consumer(VirtualHost,
Acc#{K => V}
end, maps:new(), Requests),
_ = rabbit_stream_sac_coordinator:unregister_consumer(VirtualHost,
Stream,
ConsumerName,
self(),
SubscriptionId),
_ = sac_unregister_consumer(VirtualHost, Stream, ConsumerName,
self(), SubscriptionId),
Requests1.
partition_index(VirtualHost, Stream, Properties) ->
@ -4037,3 +4019,40 @@ stream_from_consumers(SubId, Consumers) ->
%% for a bit so they can't DOS us with repeated failed logins etc.
silent_close_delay() ->
timer:sleep(?SILENT_CLOSE_DELAY).
sac_connection_reconnected(Pid) ->
sac_call(fun() ->
?SAC_MOD:connection_reconnected(Pid)
end).
sac_activate_consumer(VH, St, Name) ->
sac_call(fun() ->
?SAC_MOD:activate_consumer(VH, St, Name)
end).
sac_register_consumer(VH, St, PartitionIndex, Name, Pid, ConnName, SubId) ->
sac_call(fun() ->
?SAC_MOD:register_consumer(VH, St, PartitionIndex,
Name, Pid, ConnName,
SubId)
end).
sac_unregister_consumer(VH, St, Name, Pid, SubId) ->
sac_call(fun() ->
?SAC_MOD:unregister_consumer(VH, St, Name, Pid, SubId)
end).
sac_call(Call) ->
case Call() of
{error, Reason} = Err ->
case ?SAC_MOD:is_sac_error(Reason) of
true ->
Err;
_ ->
rabbit_log:info("Stream SAC coordinator call failed with ~tp",
[Reason]),
throw({stop, {shutdown, stream_sac_coordinator_error}})
end;
R ->
R
end.