Prevent blocked groups in stream SAC with fine-grained status

A boolean status in the stream SAC coordinator is not enough to follow
the evolution of a consumer. For example a former active consumer that
is stepping down can go down before another consumer in the group is
activated, letting the coordinator expect an activation request that
will never arrive, leaving the group without any active consumer.

This commit introduces 3 status: active (formerly "true"), waiting
(formerly "false"), and deactivating. The coordinator will now know when
a deactivating consumer goes down and will trigger a rebalancing to
avoid a stuck group.

This commit also introduces a status related to the connectivity state
of a consumer. The possible values are: connected, disconnected, and
presumed_down. Consumers are by default connected, they can become
disconnected if the coordinator receives a down event with a
noconnection reason, meaning the node of the consumer has been
disconnected from the other nodes. Consumers can become connected again when
their node joins the other nodes again.

Disconnected consumers are still considered part of a group, as they are
expected to come back at some point. For example there is no rebalancing
in a group if the active consumer got disconnected.

The coordinator sets a timer when a disconnection occurs. When the timer
expires, corresponding disconnected consumers pass into the "presumed
down" state. At this point they are no longer considered part of their
respective group and are excluded from rebalancing decision. They are expected
to get removed from the group by the appropriate down event of a
monitor.

So the consumer status is now a tuple, e.g. {connected, active}. Note
this is an implementation detail: only the stream SAC coordinator deals with
the status of stream SAC consumers.

2 new configuration entries are introduced:
 * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the
   disconnected-to-forgotten timer.
 * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands
   in the coordinator. It used to be a fixed value of 30 seconds. The
   default value is still the same. The setting has been introduced to
   make integration tests faster.

Fixes #14070

(cherry picked from commit d1aab61566)
This commit is contained in:
Arnaud Cogoluègnes 2025-06-10 12:01:18 +02:00 committed by Mergify
parent 329d6f5757
commit dde4b0d3bf
17 changed files with 4803 additions and 591 deletions

View File

@ -268,7 +268,7 @@ PARALLEL_CT_SET_2_B = clustering_recovery crashing_queues deprecated_features di
PARALLEL_CT_SET_2_C = disk_monitor dynamic_qq unit_disk_monitor unit_file_handle_cache unit_log_management unit_operator_policy PARALLEL_CT_SET_2_C = disk_monitor dynamic_qq unit_disk_monitor unit_file_handle_cache unit_log_management unit_operator_policy
PARALLEL_CT_SET_2_D = queue_length_limits queue_parallel quorum_queue_member_reconciliation rabbit_fifo rabbit_fifo_dlx rabbit_stream_coordinator PARALLEL_CT_SET_2_D = queue_length_limits queue_parallel quorum_queue_member_reconciliation rabbit_fifo rabbit_fifo_dlx rabbit_stream_coordinator
PARALLEL_CT_SET_3_A = definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_v0 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue PARALLEL_CT_SET_3_A = definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_v0 rabbit_stream_sac_coordinator_v4 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue
PARALLEL_CT_SET_3_B = cluster_upgrade list_consumers_sanity_check list_queues_online_and_offline logging lqueue maintenance_mode rabbit_fifo_q PARALLEL_CT_SET_3_B = cluster_upgrade list_consumers_sanity_check list_queues_online_and_offline logging lqueue maintenance_mode rabbit_fifo_q
PARALLEL_CT_SET_3_C = cli_forget_cluster_node feature_flags_v2 mc_unit message_containers_deaths_v2 message_size_limit metadata_store_migration PARALLEL_CT_SET_3_C = cli_forget_cluster_node feature_flags_v2 mc_unit message_containers_deaths_v2 message_size_limit metadata_store_migration
PARALLEL_CT_SET_3_D = metadata_store_phase1 metrics mirrored_supervisor peer_discovery_classic_config proxy_protocol runtime_parameters unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor PARALLEL_CT_SET_3_D = metadata_store_phase1 metrics mirrored_supervisor peer_discovery_classic_config proxy_protocol runtime_parameters unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor

View File

@ -117,6 +117,7 @@
, rabbit_local_random_exchange_SUITE , rabbit_local_random_exchange_SUITE
, rabbit_message_interceptor_SUITE , rabbit_message_interceptor_SUITE
, rabbit_stream_coordinator_SUITE , rabbit_stream_coordinator_SUITE
, rabbit_stream_sac_coordinator_v4_SUITE
, rabbit_stream_sac_coordinator_SUITE , rabbit_stream_sac_coordinator_SUITE
, rabbitmq_4_0_deprecations_SUITE , rabbitmq_4_0_deprecations_SUITE
, rabbitmq_queues_cli_integration_SUITE , rabbitmq_queues_cli_integration_SUITE

View File

@ -15,7 +15,7 @@
apply/3, apply/3,
state_enter/2, state_enter/2,
init_aux/1, init_aux/1,
handle_aux/6, handle_aux/5,
tick/2, tick/2,
version/0, version/0,
which_module/1, which_module/1,
@ -31,8 +31,7 @@
transfer_leadership/1, transfer_leadership/1,
forget_node/1, forget_node/1,
status/0, status/0,
member_overview/0 member_overview/0]).
]).
%% stream API %% stream API
-export([new_stream/2, -export([new_stream/2,
@ -42,8 +41,7 @@
add_replica/2, add_replica/2,
delete_replica/2, delete_replica/2,
register_listener/1, register_listener/1,
register_local_member_listener/1 register_local_member_listener/1]).
]).
-export([local_pid/1, -export([local_pid/1,
writer_pid/1, writer_pid/1,
@ -57,10 +55,8 @@
query_stream_overview/2, query_stream_overview/2,
ra_local_query/1]). ra_local_query/1]).
-export([log_overview/1, -export([log_overview/1,
key_metrics_rpc/1 key_metrics_rpc/1]).
]).
%% for SAC coordinator %% for SAC coordinator
-export([sac_state/1]). -export([sac_state/1]).
@ -68,11 +64,10 @@
%% for testing and debugging %% for testing and debugging
-export([eval_listeners/3, -export([eval_listeners/3,
replay/1, replay/1,
state/0]). state/0,
sac_state/0]).
-import(rabbit_queue_type_util, [ -import(rabbit_queue_type_util, [erpc_call/5]).
erpc_call/5
]).
-rabbit_boot_step({?MODULE, -rabbit_boot_step({?MODULE,
[{description, "Restart stream coordinator"}, [{description, "Restart stream coordinator"},
@ -90,6 +85,10 @@
-include("amqqueue.hrl"). -include("amqqueue.hrl").
-define(REPLICA_FRESHNESS_LIMIT_MS, 10 * 1000). %% 10s -define(REPLICA_FRESHNESS_LIMIT_MS, 10 * 1000). %% 10s
-define(V2_OR_MORE(Vsn), Vsn >= 2).
-define(V5_OR_MORE(Vsn), Vsn >= 5).
-define(SAC_V4, rabbit_stream_sac_coordinator_v4).
-define(SAC_CURRENT, rabbit_stream_sac_coordinator).
-type state() :: #?MODULE{}. -type state() :: #?MODULE{}.
-type args() :: #{index := ra:index(), -type args() :: #{index := ra:index(),
@ -119,7 +118,8 @@
{retention_updated, stream_id(), args()} | {retention_updated, stream_id(), args()} |
{mnesia_updated, stream_id(), args()} | {mnesia_updated, stream_id(), args()} |
{sac, rabbit_stream_sac_coordinator:command()} | {sac, rabbit_stream_sac_coordinator:command()} |
ra_machine:effect(). {machine_version, ra_machine:version(), ra_machine:version()} |
ra_machine:builtin_command().
-export_type([command/0]). -export_type([command/0]).
@ -278,6 +278,16 @@ state() ->
Any Any
end. end.
%% for debugging
sac_state() ->
case state() of
S when is_record(S, ?MODULE) ->
sac_state(S);
R ->
R
end.
writer_pid(StreamId) when is_list(StreamId) -> writer_pid(StreamId) when is_list(StreamId) ->
MFA = {?MODULE, query_writer_pid, [StreamId]}, MFA = {?MODULE, query_writer_pid, [StreamId]},
query_pid(StreamId, MFA). query_pid(StreamId, MFA).
@ -426,10 +436,16 @@ process_command(Cmd) ->
process_command([], _Cmd) -> process_command([], _Cmd) ->
{error, coordinator_unavailable}; {error, coordinator_unavailable};
process_command([Server | Servers], Cmd) -> process_command([Server | Servers], Cmd) ->
case ra:process_command(Server, Cmd, ?CMD_TIMEOUT) of case ra:process_command(Server, Cmd, cmd_timeout()) of
{timeout, _} -> {timeout, _} ->
CmdLabel = case Cmd of
{sac, SacCmd} ->
element(1, SacCmd);
_ ->
element(1, Cmd)
end,
rabbit_log:warning("Coordinator timeout on server ~w when processing command ~W", rabbit_log:warning("Coordinator timeout on server ~w when processing command ~W",
[element(2, Server), element(1, Cmd), 10]), [element(2, Server), CmdLabel, 10]),
process_command(Servers, Cmd); process_command(Servers, Cmd);
{error, noproc} -> {error, noproc} ->
process_command(Servers, Cmd); process_command(Servers, Cmd);
@ -439,6 +455,9 @@ process_command([Server | Servers], Cmd) ->
Reply Reply
end. end.
cmd_timeout() ->
application:get_env(rabbit, stream_cmd_timeout, ?CMD_TIMEOUT).
ensure_coordinator_started() -> ensure_coordinator_started() ->
Local = {?MODULE, node()}, Local = {?MODULE, node()},
ExpectedMembers = expected_coord_members(), ExpectedMembers = expected_coord_members(),
@ -520,13 +539,16 @@ reachable_coord_members() ->
Nodes = rabbit_nodes:list_reachable(), Nodes = rabbit_nodes:list_reachable(),
[{?MODULE, Node} || Node <- Nodes]. [{?MODULE, Node} || Node <- Nodes].
version() -> 4. version() -> 5.
which_module(_) -> which_module(_) ->
?MODULE. ?MODULE.
init(_Conf) -> init(#{machine_version := Vsn}) when ?V5_OR_MORE(Vsn) ->
#?MODULE{single_active_consumer = rabbit_stream_sac_coordinator:init_state()}. #?MODULE{single_active_consumer =
rabbit_stream_sac_coordinator:init_state()};
init(_) ->
#?MODULE{single_active_consumer = rabbit_stream_sac_coordinator_v4:init_state()}.
-spec apply(ra_machine:command_meta_data(), command(), state()) -> -spec apply(ra_machine:command_meta_data(), command(), state()) ->
{state(), term(), ra_machine:effects()}. {state(), term(), ra_machine:effects()}.
@ -564,12 +586,13 @@ apply(#{index := _Idx, machine_version := MachineVersion} = Meta0,
end; end;
apply(Meta, {sac, SacCommand}, #?MODULE{single_active_consumer = SacState0, apply(Meta, {sac, SacCommand}, #?MODULE{single_active_consumer = SacState0,
monitors = Monitors0} = State0) -> monitors = Monitors0} = State0) ->
{SacState1, Reply, Effects0} = rabbit_stream_sac_coordinator:apply(SacCommand, SacState0), Mod = sac_module(Meta),
{SacState1, Reply, Effects0} = Mod:apply(SacCommand, SacState0),
{SacState2, Monitors1, Effects1} = {SacState2, Monitors1, Effects1} =
rabbit_stream_sac_coordinator:ensure_monitors(SacCommand, SacState1, Monitors0, Effects0), Mod:ensure_monitors(SacCommand, SacState1, Monitors0, Effects0),
return(Meta, State0#?MODULE{single_active_consumer = SacState2, return(Meta, State0#?MODULE{single_active_consumer = SacState2,
monitors = Monitors1}, Reply, Effects1); monitors = Monitors1}, Reply, Effects1);
apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd, apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd,
#?MODULE{streams = Streams0, #?MODULE{streams = Streams0,
monitors = Monitors0, monitors = Monitors0,
listeners = StateListeners0, listeners = StateListeners0,
@ -581,7 +604,7 @@ apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
[] []
end, end,
case maps:take(Pid, Monitors0) of case maps:take(Pid, Monitors0) of
{{StreamId, listener}, Monitors} when MachineVersion < 2 -> {{StreamId, listener}, Monitors} when Vsn < 2 ->
Listeners = case maps:take(StreamId, StateListeners0) of Listeners = case maps:take(StreamId, StateListeners0) of
error -> error ->
StateListeners0; StateListeners0;
@ -595,7 +618,7 @@ apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
end, end,
return(Meta, State#?MODULE{listeners = Listeners, return(Meta, State#?MODULE{listeners = Listeners,
monitors = Monitors}, ok, Effects0); monitors = Monitors}, ok, Effects0);
{{PidStreams, listener}, Monitors} when MachineVersion >= 2 -> {{PidStreams, listener}, Monitors} when ?V2_OR_MORE(Vsn) ->
Streams = maps:fold( Streams = maps:fold(
fun(StreamId, _, Acc) -> fun(StreamId, _, Acc) ->
case Acc of case Acc of
@ -629,9 +652,11 @@ apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
monitors = Monitors1}, ok, Effects0) monitors = Monitors1}, ok, Effects0)
end; end;
{sac, Monitors1} -> {sac, Monitors1} ->
{SacState1, Effects} = rabbit_stream_sac_coordinator:handle_connection_down(Pid, SacState0), {SacState1, SacEffects} = sac_handle_connection_down(SacState0, Pid,
Reason, Vsn),
return(Meta, State#?MODULE{single_active_consumer = SacState1, return(Meta, State#?MODULE{single_active_consumer = SacState1,
monitors = Monitors1}, ok, Effects); monitors = Monitors1},
ok, [Effects0 ++ SacEffects]);
error -> error ->
return(Meta, State, ok, Effects0) return(Meta, State, ok, Effects0)
end; end;
@ -657,11 +682,11 @@ apply(#{machine_version := MachineVersion} = Meta,
return(Meta, State0, stream_not_found, []) return(Meta, State0, stream_not_found, [])
end; end;
apply(#{machine_version := MachineVersion} = Meta, apply(#{machine_version := Vsn} = Meta,
{register_listener, #{pid := Pid, {register_listener, #{pid := Pid,
stream_id := StreamId} = Args}, stream_id := StreamId} = Args},
#?MODULE{streams = Streams, #?MODULE{streams = Streams,
monitors = Monitors0} = State0) when MachineVersion >= 2 -> monitors = Monitors0} = State0) when ?V2_OR_MORE(Vsn) ->
Node = maps:get(node, Args, node(Pid)), Node = maps:get(node, Args, node(Pid)),
Type = maps:get(type, Args, leader), Type = maps:get(type, Args, leader),
@ -685,9 +710,11 @@ apply(#{machine_version := MachineVersion} = Meta,
_ -> _ ->
return(Meta, State0, stream_not_found, []) return(Meta, State0, stream_not_found, [])
end; end;
apply(Meta, {nodeup, Node} = Cmd, apply(#{machine_version := Vsn} = Meta,
{nodeup, Node} = Cmd,
#?MODULE{monitors = Monitors0, #?MODULE{monitors = Monitors0,
streams = Streams0} = State) -> streams = Streams0,
single_active_consumer = Sac0} = State) ->
%% reissue monitors for all disconnected members %% reissue monitors for all disconnected members
{Effects0, Monitors} = {Effects0, Monitors} =
maps:fold( maps:fold(
@ -701,14 +728,24 @@ apply(Meta, {nodeup, Node} = Cmd,
{Acc, Mon} {Acc, Mon}
end end
end, {[], Monitors0}, Streams0), end, {[], Monitors0}, Streams0),
{Streams, Effects} = {Streams, Effects1} =
maps:fold(fun (Id, S0, {Ss, E0}) -> maps:fold(fun (Id, S0, {Ss, E0}) ->
S1 = update_stream(Meta, Cmd, S0), S1 = update_stream(Meta, Cmd, S0),
{S, E} = evaluate_stream(Meta, S1, E0), {S, E} = evaluate_stream(Meta, S1, E0),
{Ss#{Id => S}, E} {Ss#{Id => S}, E}
end, {Streams0, Effects0}, Streams0), end, {Streams0, Effects0}, Streams0),
{Sac1, Effects2} = case ?V5_OR_MORE(Vsn) of
true ->
SacMod = sac_module(Meta),
SacMod:handle_node_reconnected(Node,
Sac0, Effects1);
false ->
{Sac0, Effects1}
end,
return(Meta, State#?MODULE{monitors = Monitors, return(Meta, State#?MODULE{monitors = Monitors,
streams = Streams}, ok, Effects); streams = Streams,
single_active_consumer = Sac1}, ok, Effects2);
apply(Meta, {machine_version, From, To}, State0) -> apply(Meta, {machine_version, From, To}, State0) ->
rabbit_log:info("Stream coordinator machine version changes from ~tp to ~tp, " rabbit_log:info("Stream coordinator machine version changes from ~tp to ~tp, "
++ "applying incremental upgrade.", [From, To]), ++ "applying incremental upgrade.", [From, To]),
@ -719,6 +756,12 @@ apply(Meta, {machine_version, From, To}, State0) ->
{S1, Eff0 ++ Eff1} {S1, Eff0 ++ Eff1}
end, {State0, []}, lists:seq(From, To - 1)), end, {State0, []}, lists:seq(From, To - 1)),
return(Meta, State1, ok, Effects); return(Meta, State1, ok, Effects);
apply(Meta, {timeout, {sac, node_disconnected, #{connection_pid := Pid}}},
#?MODULE{single_active_consumer = SacState0} = State0) ->
Mod = sac_module(Meta),
{SacState1, Effects} = Mod:presume_connection_down(Pid, SacState0),
return(Meta, State0#?MODULE{single_active_consumer = SacState1}, ok,
Effects);
apply(Meta, UnkCmd, State) -> apply(Meta, UnkCmd, State) ->
rabbit_log:debug("~ts: unknown command ~W", rabbit_log:debug("~ts: unknown command ~W",
[?MODULE, UnkCmd, 10]), [?MODULE, UnkCmd, 10]),
@ -737,16 +780,23 @@ state_enter(recover, _) ->
put('$rabbit_vm_category', ?MODULE), put('$rabbit_vm_category', ?MODULE),
[]; [];
state_enter(leader, #?MODULE{streams = Streams, state_enter(leader, #?MODULE{streams = Streams,
monitors = Monitors}) -> monitors = Monitors,
single_active_consumer = SacState}) ->
Pids = maps:keys(Monitors), Pids = maps:keys(Monitors),
%% monitor all the known nodes %% monitor all the known nodes
Nodes = all_member_nodes(Streams), Nodes = all_member_nodes(Streams),
NodeMons = [{monitor, node, N} || N <- Nodes], NodeMons = [{monitor, node, N} || N <- Nodes],
NodeMons ++ [{aux, fail_active_actions} | SacEffects = ?SAC_CURRENT:state_enter(leader, SacState),
[{monitor, process, P} || P <- Pids]]; SacEffects ++ NodeMons ++ [{aux, fail_active_actions} |
[{monitor, process, P} || P <- Pids]];
state_enter(_S, _) -> state_enter(_S, _) ->
[]. [].
sac_module(#{machine_version := Vsn}) when ?V5_OR_MORE(Vsn) ->
?SAC_CURRENT;
sac_module(_) ->
?SAC_V4.
all_member_nodes(Streams) -> all_member_nodes(Streams) ->
maps:keys( maps:keys(
maps:fold( maps:fold(
@ -754,8 +804,9 @@ all_member_nodes(Streams) ->
maps:merge(Acc, M) maps:merge(Acc, M)
end, #{}, Streams)). end, #{}, Streams)).
tick(_Ts, _State) -> tick(_Ts, #?MODULE{single_active_consumer = SacState}) ->
[{aux, maybe_resize_coordinator_cluster}]. [{aux, maybe_resize_coordinator_cluster} |
maybe_update_sac_configuration(SacState)].
members() -> members() ->
%% TODO: this can be replaced with a ra_leaderboard %% TODO: this can be replaced with a ra_leaderboard
@ -780,7 +831,7 @@ members() ->
end end
end. end.
maybe_resize_coordinator_cluster() -> maybe_resize_coordinator_cluster(LeaderPid, SacNodes, MachineVersion) ->
spawn(fun() -> spawn(fun() ->
RabbitIsRunning = rabbit:is_running(), RabbitIsRunning = rabbit:is_running(),
case members() of case members() of
@ -806,19 +857,49 @@ maybe_resize_coordinator_cluster() ->
case MemberNodes -- RabbitNodes of case MemberNodes -- RabbitNodes of
[] -> [] ->
ok; ok;
[Old | _] -> [Old | _] when length(RabbitNodes) > 0 ->
%% this ought to be rather rare as the stream %% this ought to be rather rare as the stream
%% coordinator member is now removed as part %% coordinator member is now removed as part
%% of the forget_cluster_node command %% of the forget_cluster_node command
rabbit_log:info("~ts: Rabbit node(s) removed from the cluster, " rabbit_log:info("~ts: Rabbit node(s) removed "
"from the cluster, "
"deleting: ~w", [?MODULE, Old]), "deleting: ~w", [?MODULE, Old]),
remove_member(Leader, Members, Old) _ = remove_member(Leader, Members, Old),
end; ok
end,
maybe_handle_stale_nodes(SacNodes, RabbitNodes,
LeaderPid,
MachineVersion);
_ -> _ ->
ok ok
end end
end). end).
maybe_handle_stale_nodes(SacNodes, BrokerNodes,
LeaderPid, Vsn) when ?V5_OR_MORE(Vsn) ->
case SacNodes -- BrokerNodes of
[] ->
ok;
Stale when length(BrokerNodes) > 0 ->
rabbit_log:debug("Stale nodes detected in stream SAC "
"coordinator: ~w. Purging state.",
[Stale]),
ra:pipeline_command(LeaderPid, sac_make_purge_nodes(Stale)),
ok;
_ ->
ok
end;
maybe_handle_stale_nodes(_, _, _, _) ->
ok.
maybe_update_sac_configuration(SacState) ->
case sac_check_conf_change(SacState) of
{new, UpdatedConf} ->
[{append, sac_make_update_conf(UpdatedConf), noreply}];
_ ->
[]
end.
add_member(Members, Node) -> add_member(Members, Node) ->
MinMacVersion = erpc:call(Node, ?MODULE, version, []), MinMacVersion = erpc:call(Node, ?MODULE, version, []),
Conf = make_ra_conf(Node, [N || {_, N} <- Members], MinMacVersion), Conf = make_ra_conf(Node, [N || {_, N} <- Members], MinMacVersion),
@ -892,65 +973,64 @@ init_aux(_Name) ->
%% TODO ensure the dead writer is restarted as a replica at some point in time, increasing timeout? %% TODO ensure the dead writer is restarted as a replica at some point in time, increasing timeout?
handle_aux(leader, _, maybe_resize_coordinator_cluster, handle_aux(leader, _, maybe_resize_coordinator_cluster,
#aux{resizer = undefined} = Aux, LogState, _) -> #aux{resizer = undefined} = Aux, RaAux) ->
Pid = maybe_resize_coordinator_cluster(), Leader = ra_aux:leader_id(RaAux),
{no_reply, Aux#aux{resizer = Pid}, LogState, [{monitor, process, aux, Pid}]}; MachineVersion = ra_aux:effective_machine_version(RaAux),
SacNodes = sac_list_nodes(ra_aux:machine_state(RaAux), MachineVersion),
Pid = maybe_resize_coordinator_cluster(Leader, SacNodes, MachineVersion),
{no_reply, Aux#aux{resizer = Pid}, RaAux, [{monitor, process, aux, Pid}]};
handle_aux(leader, _, maybe_resize_coordinator_cluster, handle_aux(leader, _, maybe_resize_coordinator_cluster,
AuxState, LogState, _) -> AuxState, RaAux) ->
%% Coordinator resizing is still happening, let's ignore this tick event %% Coordinator resizing is still happening, let's ignore this tick event
{no_reply, AuxState, LogState}; {no_reply, AuxState, RaAux};
handle_aux(leader, _, {down, Pid, _}, handle_aux(leader, _, {down, Pid, _},
#aux{resizer = Pid} = Aux, LogState, _) -> #aux{resizer = Pid} = Aux, RaAux) ->
%% Coordinator resizing has finished %% Coordinator resizing has finished
{no_reply, Aux#aux{resizer = undefined}, LogState}; {no_reply, Aux#aux{resizer = undefined}, RaAux};
handle_aux(leader, _, {start_writer, StreamId, handle_aux(leader, _, {start_writer, StreamId,
#{epoch := Epoch, node := Node} = Args, Conf}, #{epoch := Epoch, node := Node} = Args, Conf},
Aux, LogState, _) -> Aux, RaAux) ->
rabbit_log:debug("~ts: running action: 'start_writer'" rabbit_log:debug("~ts: running action: 'start_writer'"
" for ~ts on node ~w in epoch ~b", " for ~ts on node ~w in epoch ~b",
[?MODULE, StreamId, Node, Epoch]), [?MODULE, StreamId, Node, Epoch]),
ActionFun = phase_start_writer(StreamId, Args, Conf), ActionFun = phase_start_writer(StreamId, Args, Conf),
run_action(starting, StreamId, Args, ActionFun, Aux, LogState); run_action(starting, StreamId, Args, ActionFun, Aux, RaAux);
handle_aux(leader, _, {start_replica, StreamId, handle_aux(leader, _, {start_replica, StreamId,
#{epoch := Epoch, node := Node} = Args, Conf}, #{epoch := Epoch, node := Node} = Args, Conf},
Aux, LogState, _) -> Aux, RaAux) ->
rabbit_log:debug("~ts: running action: 'start_replica'" rabbit_log:debug("~ts: running action: 'start_replica'"
" for ~ts on node ~w in epoch ~b", " for ~ts on node ~w in epoch ~b",
[?MODULE, StreamId, Node, Epoch]), [?MODULE, StreamId, Node, Epoch]),
ActionFun = phase_start_replica(StreamId, Args, Conf), ActionFun = phase_start_replica(StreamId, Args, Conf),
run_action(starting, StreamId, Args, ActionFun, Aux, LogState); run_action(starting, StreamId, Args, ActionFun, Aux, RaAux);
handle_aux(leader, _, {stop, StreamId, #{node := Node, handle_aux(leader, _, {stop, StreamId, #{node := Node,
epoch := Epoch} = Args, Conf}, epoch := Epoch} = Args, Conf},
Aux, LogState, _) -> Aux, RaAux) ->
rabbit_log:debug("~ts: running action: 'stop'" rabbit_log:debug("~ts: running action: 'stop'"
" for ~ts on node ~w in epoch ~b", " for ~ts on node ~w in epoch ~b",
[?MODULE, StreamId, Node, Epoch]), [?MODULE, StreamId, Node, Epoch]),
ActionFun = phase_stop_member(StreamId, Args, Conf), ActionFun = phase_stop_member(StreamId, Args, Conf),
run_action(stopping, StreamId, Args, ActionFun, Aux, LogState); run_action(stopping, StreamId, Args, ActionFun, Aux, RaAux);
handle_aux(leader, _, {update_mnesia, StreamId, Args, Conf}, handle_aux(leader, _, {update_mnesia, StreamId, Args, Conf},
#aux{actions = _Monitors} = Aux, LogState, #aux{actions = _Monitors} = Aux, RaAux) ->
#?MODULE{streams = _Streams}) ->
rabbit_log:debug("~ts: running action: 'update_mnesia'" rabbit_log:debug("~ts: running action: 'update_mnesia'"
" for ~ts", [?MODULE, StreamId]), " for ~ts", [?MODULE, StreamId]),
ActionFun = phase_update_mnesia(StreamId, Args, Conf), ActionFun = phase_update_mnesia(StreamId, Args, Conf),
run_action(updating_mnesia, StreamId, Args, ActionFun, Aux, LogState); run_action(updating_mnesia, StreamId, Args, ActionFun, Aux, RaAux);
handle_aux(leader, _, {update_retention, StreamId, Args, _Conf}, handle_aux(leader, _, {update_retention, StreamId, Args, _Conf},
#aux{actions = _Monitors} = Aux, LogState, #aux{actions = _Monitors} = Aux, RaAux) ->
#?MODULE{streams = _Streams}) ->
rabbit_log:debug("~ts: running action: 'update_retention'" rabbit_log:debug("~ts: running action: 'update_retention'"
" for ~ts", [?MODULE, StreamId]), " for ~ts", [?MODULE, StreamId]),
ActionFun = phase_update_retention(StreamId, Args), ActionFun = phase_update_retention(StreamId, Args),
run_action(update_retention, StreamId, Args, ActionFun, Aux, LogState); run_action(update_retention, StreamId, Args, ActionFun, Aux, RaAux);
handle_aux(leader, _, {delete_member, StreamId, #{node := Node} = Args, Conf}, handle_aux(leader, _, {delete_member, StreamId, #{node := Node} = Args, Conf},
#aux{actions = _Monitors} = Aux, LogState, #aux{actions = _Monitors} = Aux, RaAux) ->
#?MODULE{streams = _Streams}) ->
rabbit_log:debug("~ts: running action: 'delete_member'" rabbit_log:debug("~ts: running action: 'delete_member'"
" for ~ts ~ts", [?MODULE, StreamId, Node]), " for ~ts ~ts", [?MODULE, StreamId, Node]),
ActionFun = phase_delete_member(StreamId, Args, Conf), ActionFun = phase_delete_member(StreamId, Args, Conf),
run_action(delete_member, StreamId, Args, ActionFun, Aux, LogState); run_action(delete_member, StreamId, Args, ActionFun, Aux, RaAux);
handle_aux(leader, _, fail_active_actions, handle_aux(leader, _, fail_active_actions,
#aux{actions = Actions} = Aux, LogState, #aux{actions = Actions} = Aux, RaAux) ->
#?MODULE{streams = Streams}) ->
%% this bit of code just creates an exclude map of currently running %% this bit of code just creates an exclude map of currently running
%% tasks to avoid failing them, this could only really happen during %% tasks to avoid failing them, this could only really happen during
%% a leader flipflap %% a leader flipflap
@ -958,14 +1038,15 @@ handle_aux(leader, _, fail_active_actions,
|| {P, {S, _, _}} <- maps_to_list(Actions), || {P, {S, _, _}} <- maps_to_list(Actions),
is_process_alive(P)]), is_process_alive(P)]),
rabbit_log:debug("~ts: failing actions: ~w", [?MODULE, Exclude]), rabbit_log:debug("~ts: failing actions: ~w", [?MODULE, Exclude]),
#?MODULE{streams = Streams} = ra_aux:machine_state(RaAux),
fail_active_actions(Streams, Exclude), fail_active_actions(Streams, Exclude),
{no_reply, Aux, LogState, []}; {no_reply, Aux, RaAux, []};
handle_aux(leader, _, {down, Pid, normal}, handle_aux(leader, _, {down, Pid, normal},
#aux{actions = Monitors} = Aux, LogState, _) -> #aux{actions = Monitors} = Aux, RaAux) ->
%% action process finished normally, just remove from actions map %% action process finished normally, just remove from actions map
{no_reply, Aux#aux{actions = maps:remove(Pid, Monitors)}, LogState, []}; {no_reply, Aux#aux{actions = maps:remove(Pid, Monitors)}, RaAux, []};
handle_aux(leader, _, {down, Pid, Reason}, handle_aux(leader, _, {down, Pid, Reason},
#aux{actions = Monitors0} = Aux, LogState, _) -> #aux{actions = Monitors0} = Aux, RaAux) ->
%% An action has failed - report back to the state machine %% An action has failed - report back to the state machine
case maps:get(Pid, Monitors0, undefined) of case maps:get(Pid, Monitors0, undefined) of
{StreamId, Action, #{node := Node, epoch := Epoch} = Args} -> {StreamId, Action, #{node := Node, epoch := Epoch} = Args} ->
@ -976,13 +1057,13 @@ handle_aux(leader, _, {down, Pid, Reason},
Cmd = {action_failed, StreamId, Args#{action => Action}}, Cmd = {action_failed, StreamId, Args#{action => Action}},
send_self_command(Cmd), send_self_command(Cmd),
{no_reply, Aux#aux{actions = maps:remove(Pid, Monitors)}, {no_reply, Aux#aux{actions = maps:remove(Pid, Monitors)},
LogState, []}; RaAux, []};
undefined -> undefined ->
%% should this ever happen? %% should this ever happen?
{no_reply, Aux, LogState, []} {no_reply, Aux, RaAux, []}
end; end;
handle_aux(_, _, _, AuxState, LogState, _) -> handle_aux(_, _, _, AuxState, RaAux) ->
{no_reply, AuxState, LogState}. {no_reply, AuxState, RaAux}.
overview(#?MODULE{streams = Streams, overview(#?MODULE{streams = Streams,
monitors = Monitors, monitors = Monitors,
@ -1018,7 +1099,7 @@ stream_overview0(#stream{epoch = Epoch,
run_action(Action, StreamId, #{node := _Node, run_action(Action, StreamId, #{node := _Node,
epoch := _Epoch} = Args, epoch := _Epoch} = Args,
ActionFun, #aux{actions = Actions0} = Aux, Log) -> ActionFun, #aux{actions = Actions0} = Aux, RaAux) ->
Coordinator = self(), Coordinator = self(),
Pid = spawn_link(fun() -> Pid = spawn_link(fun() ->
ActionFun(), ActionFun(),
@ -1026,7 +1107,7 @@ run_action(Action, StreamId, #{node := _Node,
end), end),
Effects = [{monitor, process, aux, Pid}], Effects = [{monitor, process, aux, Pid}],
Actions = Actions0#{Pid => {StreamId, Action, Args}}, Actions = Actions0#{Pid => {StreamId, Action, Args}},
{no_reply, Aux#aux{actions = Actions}, Log, Effects}. {no_reply, Aux#aux{actions = Actions}, RaAux, Effects}.
wrap_reply(From, Reply) -> wrap_reply(From, Reply) ->
[{reply, From, {wrap_reply, Reply}}]. [{reply, From, {wrap_reply, Reply}}].
@ -1641,20 +1722,20 @@ update_stream0(_Meta, {update_config, _StreamId, Conf},
update_stream0(_Meta, _Cmd, undefined) -> update_stream0(_Meta, _Cmd, undefined) ->
undefined. undefined.
inform_listeners_eol(MachineVersion, inform_listeners_eol(Vsn,
#stream{target = deleted, #stream{target = deleted,
listeners = Listeners, listeners = Listeners,
queue_ref = QRef}) queue_ref = QRef})
when MachineVersion =< 1 -> when Vsn =< 1 ->
lists:map(fun(Pid) -> lists:map(fun(Pid) ->
{send_msg, Pid, {send_msg, Pid,
{queue_event, QRef, eol}, {queue_event, QRef, eol},
cast} cast}
end, maps:keys(Listeners)); end, maps:keys(Listeners));
inform_listeners_eol(MachineVersion, inform_listeners_eol(Vsn,
#stream{target = deleted, #stream{target = deleted,
listeners = Listeners, listeners = Listeners,
queue_ref = QRef}) when MachineVersion >= 2 -> queue_ref = QRef}) when ?V2_OR_MORE(Vsn) ->
LPidsMap = maps:fold(fun({P, _}, _V, Acc) -> LPidsMap = maps:fold(fun({P, _}, _V, Acc) ->
Acc#{P => ok} Acc#{P => ok}
end, #{}, Listeners), end, #{}, Listeners),
@ -1702,9 +1783,9 @@ eval_listeners(MachineVersion, #stream{listeners = Listeners0,
_ -> _ ->
{Stream, Effects0} {Stream, Effects0}
end; end;
eval_listeners(MachineVersion, #stream{listeners = Listeners0} = Stream0, eval_listeners(Vsn, #stream{listeners = Listeners0} = Stream0,
_OldStream, Effects0) _OldStream, Effects0)
when MachineVersion >= 2 -> when ?V2_OR_MORE(Vsn) ->
%% Iterating over stream listeners. %% Iterating over stream listeners.
%% Returning the new map of listeners and the effects (notification of changes) %% Returning the new map of listeners and the effects (notification of changes)
{Listeners1, Effects1} = {Listeners1, Effects1} =
@ -2199,8 +2280,10 @@ machine_version(1, 2, State = #?MODULE{streams = Streams0,
monitors = Monitors2, monitors = Monitors2,
listeners = undefined}, Effects}; listeners = undefined}, Effects};
machine_version(2, 3, State) -> machine_version(2, 3, State) ->
rabbit_log:info("Stream coordinator machine version changes from 2 to 3, updating state."), rabbit_log:info("Stream coordinator machine version changes from 2 to 3, "
{State#?MODULE{single_active_consumer = rabbit_stream_sac_coordinator:init_state()}, "updating state."),
SacState = rabbit_stream_sac_coordinator_v4:init_state(),
{State#?MODULE{single_active_consumer = SacState},
[]}; []};
machine_version(3, 4, #?MODULE{streams = Streams0} = State) -> machine_version(3, 4, #?MODULE{streams = Streams0} = State) ->
rabbit_log:info("Stream coordinator machine version changes from 3 to 4, updating state."), rabbit_log:info("Stream coordinator machine version changes from 3 to 4, updating state."),
@ -2214,6 +2297,11 @@ machine_version(3, 4, #?MODULE{streams = Streams0} = State) ->
end, Members)} end, Members)}
end, Streams0), end, Streams0),
{State#?MODULE{streams = Streams}, []}; {State#?MODULE{streams = Streams}, []};
machine_version(4 = From, 5, #?MODULE{single_active_consumer = Sac0} = State) ->
rabbit_log:info("Stream coordinator machine version changes from 4 to 5, updating state."),
SacExport = rabbit_stream_sac_coordinator_v4:state_to_map(Sac0),
Sac1 = rabbit_stream_sac_coordinator:import_state(From, SacExport),
{State#?MODULE{single_active_consumer = Sac1}, []};
machine_version(From, To, State) -> machine_version(From, To, State) ->
rabbit_log:info("Stream coordinator machine version changes from ~tp to ~tp, no state changes required.", rabbit_log:info("Stream coordinator machine version changes from ~tp to ~tp, no state changes required.",
[From, To]), [From, To]),
@ -2350,3 +2438,22 @@ maps_to_list(M) ->
ra_local_query(QueryFun) -> ra_local_query(QueryFun) ->
ra:local_query({?MODULE, node()}, QueryFun, infinity). ra:local_query({?MODULE, node()}, QueryFun, infinity).
sac_handle_connection_down(SacState, Pid, Reason, Vsn) when ?V5_OR_MORE(Vsn) ->
?SAC_CURRENT:handle_connection_down(Pid, Reason, SacState);
sac_handle_connection_down(SacState, Pid, _Reason, _Vsn) ->
?SAC_V4:handle_connection_down(Pid, SacState).
sac_make_purge_nodes(Nodes) ->
rabbit_stream_sac_coordinator:make_purge_nodes(Nodes).
sac_make_update_conf(Conf) ->
rabbit_stream_sac_coordinator:make_update_conf(Conf).
sac_check_conf_change(SacState) ->
rabbit_stream_sac_coordinator:check_conf_change(SacState).
sac_list_nodes(State, Vsn) when ?V5_OR_MORE(Vsn) ->
rabbit_stream_sac_coordinator:list_nodes(sac_state(State));
sac_list_nodes(_, _) ->
[].

View File

@ -68,6 +68,7 @@
listeners = #{} :: undefined | #{stream_id() => listeners = #{} :: undefined | #{stream_id() =>
#{pid() := queue_ref()}}, #{pid() := queue_ref()}},
single_active_consumer = undefined :: undefined | single_active_consumer = undefined :: undefined |
rabbit_stream_sac_coordinator_v4:state() |
rabbit_stream_sac_coordinator:state(), rabbit_stream_sac_coordinator:state(),
%% future extensibility %% future extensibility
reserved_2}). reserved_2}).

File diff suppressed because it is too large Load Diff

View File

@ -22,22 +22,34 @@
-type subscription_id() :: byte(). -type subscription_id() :: byte().
-type group_id() :: {vhost(), stream(), consumer_name()}. -type group_id() :: {vhost(), stream(), consumer_name()}.
-type owner() :: binary(). -type owner() :: binary().
-type consumer_activity() :: active | waiting | deactivating.
-type consumer_connectivity() :: connected | disconnected | presumed_down.
-type consumer_status() :: {consumer_connectivity(), consumer_activity()}.
-type conf() :: map().
-type timestamp() :: integer().
-record(consumer, -record(consumer,
{pid :: pid(), {pid :: pid(),
subscription_id :: subscription_id(), subscription_id :: subscription_id(),
owner :: owner(), %% just a label owner :: owner(), %% just a label
active :: boolean()}). status :: consumer_status(),
ts :: timestamp()}).
-record(group, -record(group,
{consumers :: [#consumer{}], partition_index :: integer()}). {consumers :: [#consumer{}], partition_index :: integer()}).
-record(rabbit_stream_sac_coordinator, -record(rabbit_stream_sac_coordinator,
{groups :: #{group_id() => #group{}}, {groups :: groups(),
pids_groups :: pids_groups :: pids_groups(),
#{connection_pid() => conf :: conf(),
#{group_id() => true}}, %% inner map acts as a set
%% future extensibility %% future extensibility
reserved_1, reserved_1,
reserved_2}). reserved_2}).
-type consumer() :: #consumer{}.
-type group() :: #group{}.
-type groups() :: #{group_id() => group()}.
%% inner map acts as a set
-type pids_groups() :: #{connection_pid() => #{group_id() => true}}.
%% commands %% commands
-record(command_register_consumer, -record(command_register_consumer,
{vhost :: vhost(), {vhost :: vhost(),
@ -56,3 +68,9 @@
-record(command_activate_consumer, -record(command_activate_consumer,
{vhost :: vhost(), stream :: stream(), {vhost :: vhost(), stream :: stream(),
consumer_name :: consumer_name()}). consumer_name :: consumer_name()}).
-record(command_connection_reconnected,
{pid :: connection_pid()}).
-record(command_purge_nodes,
{nodes :: [node()]}).
-record(command_update_conf,
{conf :: conf()}).

View File

@ -0,0 +1,774 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 2.0 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at https://www.mozilla.org/en-US/MPL/2.0/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_stream_sac_coordinator_v4).
-include("rabbit_stream_sac_coordinator_v4.hrl").
-opaque command() ::
#command_register_consumer{} | #command_unregister_consumer{} |
#command_activate_consumer{}.
-opaque state() :: #rabbit_stream_sac_coordinator{}.
-export_type([state/0,
command/0]).
%% Single Active Consumer API
-export([register_consumer/7,
unregister_consumer/5,
activate_consumer/3,
consumer_groups/2,
group_consumers/4]).
-export([apply/2,
init_state/0,
send_message/2,
ensure_monitors/4,
handle_connection_down/2,
consumer_groups/3,
group_consumers/5,
overview/1,
state_to_map/1]).
-import(rabbit_stream_coordinator, [ra_local_query/1]).
-define(STATE, rabbit_stream_sac_coordinator).
%% Single Active Consumer API
-spec register_consumer(binary(),
binary(),
integer(),
binary(),
pid(),
binary(),
integer()) ->
{ok, boolean()} | {error, term()}.
register_consumer(VirtualHost,
Stream,
PartitionIndex,
ConsumerName,
ConnectionPid,
Owner,
SubscriptionId) ->
process_command({sac,
#command_register_consumer{vhost =
VirtualHost,
stream =
Stream,
partition_index
=
PartitionIndex,
consumer_name
=
ConsumerName,
connection_pid
=
ConnectionPid,
owner =
Owner,
subscription_id
=
SubscriptionId}}).
-spec unregister_consumer(binary(),
binary(),
binary(),
pid(),
integer()) ->
ok | {error, term()}.
unregister_consumer(VirtualHost,
Stream,
ConsumerName,
ConnectionPid,
SubscriptionId) ->
process_command({sac,
#command_unregister_consumer{vhost =
VirtualHost,
stream =
Stream,
consumer_name
=
ConsumerName,
connection_pid
=
ConnectionPid,
subscription_id
=
SubscriptionId}}).
-spec activate_consumer(binary(), binary(), binary()) -> ok.
activate_consumer(VirtualHost, Stream, ConsumerName) ->
process_command({sac,
#command_activate_consumer{vhost =
VirtualHost,
stream =
Stream,
consumer_name
=
ConsumerName}}).
process_command(Cmd) ->
case rabbit_stream_coordinator:process_command(Cmd) of
{ok, Res, _} ->
Res;
{error, _} = Err ->
rabbit_log:warning("SAC coordinator command ~tp returned error ~tp",
[Cmd, Err]),
Err
end.
%% return the current groups for a given virtual host
-spec consumer_groups(binary(), [atom()]) ->
{ok,
[term()] | {error, atom()}}.
consumer_groups(VirtualHost, InfoKeys) ->
case ra_local_query(fun(State) ->
SacState =
rabbit_stream_coordinator:sac_state(State),
consumer_groups(VirtualHost,
InfoKeys,
SacState)
end)
of
{ok, {_, Result}, _} -> Result;
{error, noproc} ->
%% not started yet, so no groups
{ok, []};
{error, _} = Err -> Err;
{timeout, _} -> {error, timeout}
end.
%% get the consumers of a given group in a given virtual host
-spec group_consumers(binary(), binary(), binary(), [atom()]) ->
{ok, [term()]} |
{error, atom()}.
group_consumers(VirtualHost, Stream, Reference, InfoKeys) ->
case ra_local_query(fun(State) ->
SacState =
rabbit_stream_coordinator:sac_state(State),
group_consumers(VirtualHost,
Stream,
Reference,
InfoKeys,
SacState)
end)
of
{ok, {_, {ok, _} = Result}, _} -> Result;
{ok, {_, {error, _} = Err}, _} -> Err;
{error, noproc} ->
%% not started yet, so the group cannot exist
{error, not_found};
{error, _} = Err -> Err;
{timeout, _} -> {error, timeout}
end.
-spec overview(state()) -> map().
overview(undefined) ->
undefined;
overview(#?STATE{groups = Groups}) ->
GroupsOverview =
maps:map(fun(_,
#group{consumers = Consumers, partition_index = Idx}) ->
#{num_consumers => length(Consumers),
partition_index => Idx}
end,
Groups),
#{num_groups => map_size(Groups), groups => GroupsOverview}.
-spec init_state() -> state().
init_state() ->
#?STATE{groups = #{}, pids_groups = #{}}.
-spec apply(command(), state()) ->
{state(), term(), ra_machine:effects()}.
apply(#command_register_consumer{vhost = VirtualHost,
stream = Stream,
partition_index = PartitionIndex,
consumer_name = ConsumerName,
connection_pid = ConnectionPid,
owner = Owner,
subscription_id = SubscriptionId},
#?STATE{groups = StreamGroups0} = State) ->
StreamGroups1 =
maybe_create_group(VirtualHost,
Stream,
PartitionIndex,
ConsumerName,
StreamGroups0),
do_register_consumer(VirtualHost,
Stream,
PartitionIndex,
ConsumerName,
ConnectionPid,
Owner,
SubscriptionId,
State#?STATE{groups = StreamGroups1});
apply(#command_unregister_consumer{vhost = VirtualHost,
stream = Stream,
consumer_name = ConsumerName,
connection_pid = ConnectionPid,
subscription_id = SubscriptionId},
#?STATE{groups = StreamGroups0} = State0) ->
{State1, Effects1} =
case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of
undefined ->
{State0, []};
Group0 ->
{Group1, Effects} =
case lookup_consumer(ConnectionPid, SubscriptionId, Group0)
of
{value, Consumer} ->
G1 = remove_from_group(Consumer, Group0),
handle_consumer_removal(G1, Stream, ConsumerName, Consumer#consumer.active);
false ->
{Group0, []}
end,
SGS = update_groups(VirtualHost,
Stream,
ConsumerName,
Group1,
StreamGroups0),
{State0#?STATE{groups = SGS}, Effects}
end,
{State1, ok, Effects1};
apply(#command_activate_consumer{vhost = VirtualHost,
stream = Stream,
consumer_name = ConsumerName},
#?STATE{groups = StreamGroups0} = State0) ->
{G, Eff} =
case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of
undefined ->
rabbit_log:warning("Trying to activate consumer in group ~tp, but "
"the group does not longer exist",
[{VirtualHost, Stream, ConsumerName}]),
{undefined, []};
Group ->
#consumer{pid = Pid, subscription_id = SubId} =
evaluate_active_consumer(Group),
Group1 = update_consumer_state_in_group(Group, Pid, SubId, true),
{Group1, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]}
end,
StreamGroups1 =
update_groups(VirtualHost, Stream, ConsumerName, G, StreamGroups0),
{State0#?STATE{groups = StreamGroups1}, ok, Eff}.
-spec consumer_groups(binary(), [atom()], state()) -> {ok, [term()]}.
consumer_groups(VirtualHost, InfoKeys, #?STATE{groups = Groups}) ->
Res = maps:fold(fun ({VH, Stream, Reference},
#group{consumers = Consumers,
partition_index = PartitionIndex},
Acc)
when VH == VirtualHost ->
Record =
lists:foldr(fun (stream, RecAcc) ->
[{stream, Stream} | RecAcc];
(reference, RecAcc) ->
[{reference, Reference}
| RecAcc];
(partition_index, RecAcc) ->
[{partition_index,
PartitionIndex}
| RecAcc];
(consumers, RecAcc) ->
[{consumers,
length(Consumers)}
| RecAcc];
(Unknown, RecAcc) ->
[{Unknown, unknown_field}
| RecAcc]
end,
[], InfoKeys),
[Record | Acc];
(_GroupId, _Group, Acc) ->
Acc
end,
[], Groups),
{ok, lists:reverse(Res)}.
-spec group_consumers(binary(),
binary(),
binary(),
[atom()],
state()) ->
{ok, [term()]} | {error, not_found}.
group_consumers(VirtualHost,
Stream,
Reference,
InfoKeys,
#?STATE{groups = Groups}) ->
GroupId = {VirtualHost, Stream, Reference},
case Groups of
#{GroupId := #group{consumers = Consumers}} ->
Cs = lists:foldr(fun(#consumer{subscription_id = SubId,
owner = Owner,
active = Active},
Acc) ->
Record =
lists:foldr(fun (subscription_id, RecAcc) ->
[{subscription_id,
SubId}
| RecAcc];
(connection_name, RecAcc) ->
[{connection_name,
Owner}
| RecAcc];
(state, RecAcc)
when Active ->
[{state, active}
| RecAcc];
(state, RecAcc) ->
[{state, inactive}
| RecAcc];
(Unknown, RecAcc) ->
[{Unknown,
unknown_field}
| RecAcc]
end,
[], InfoKeys),
[Record | Acc]
end,
[], Consumers),
{ok, Cs};
_ ->
{error, not_found}
end.
-spec ensure_monitors(command(),
state(),
map(),
ra_machine:effects()) ->
{state(), map(), ra_machine:effects()}.
ensure_monitors(#command_register_consumer{vhost = VirtualHost,
stream = Stream,
consumer_name = ConsumerName,
connection_pid = Pid},
#?STATE{pids_groups = PidsGroups0} = State0,
Monitors0,
Effects) ->
GroupId = {VirtualHost, Stream, ConsumerName},
Groups0 = maps:get(Pid, PidsGroups0, #{}),
PidsGroups1 =
maps:put(Pid, maps:put(GroupId, true, Groups0), PidsGroups0),
{State0#?STATE{pids_groups = PidsGroups1}, Monitors0#{Pid => sac},
[{monitor, process, Pid}, {monitor, node, node(Pid)} | Effects]};
ensure_monitors(#command_unregister_consumer{vhost = VirtualHost,
stream = Stream,
consumer_name = ConsumerName,
connection_pid = Pid},
#?STATE{groups = StreamGroups0, pids_groups = PidsGroups0} =
State0,
Monitors,
Effects)
when is_map_key(Pid, PidsGroups0) ->
GroupId = {VirtualHost, Stream, ConsumerName},
#{Pid := PidGroup0} = PidsGroups0,
PidGroup1 =
case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of
undefined ->
%% group is gone, can be removed from the PID map
maps:remove(GroupId, PidGroup0);
Group ->
%% group still exists, check if other consumers are from this PID
%% if yes, don't change the PID set
%% if no, remove group from PID set
case has_consumers_from_pid(Group, Pid) of
true ->
%% the group still depends on this PID, keep the group entry in the set
PidGroup0;
false ->
%% the group does not depend on the PID anymore, remove the group entry from the map
maps:remove(GroupId, PidGroup0)
end
end,
case maps:size(PidGroup1) == 0 of
true ->
%% no more groups depend on the PID
%% remove PID from data structure and demonitor it
{State0#?STATE{pids_groups = maps:remove(Pid, PidsGroups0)},
maps:remove(Pid, Monitors), [{demonitor, process, Pid} | Effects]};
false ->
%% one or more groups still depend on the PID
{State0#?STATE{pids_groups =
maps:put(Pid, PidGroup1, PidsGroups0)},
Monitors, Effects}
end;
ensure_monitors(_, #?STATE{} = State0, Monitors, Effects) ->
{State0, Monitors, Effects}.
-spec handle_connection_down(connection_pid(), state()) ->
{state(), ra_machine:effects()}.
handle_connection_down(Pid,
#?STATE{pids_groups = PidsGroups0} = State0) ->
case maps:take(Pid, PidsGroups0) of
error ->
{State0, []};
{Groups, PidsGroups1} ->
State1 = State0#?STATE{pids_groups = PidsGroups1},
maps:fold(fun(G, _, Acc) ->
handle_group_after_connection_down(Pid, Acc, G)
end, {State1, []}, Groups)
end.
handle_group_after_connection_down(Pid,
{#?STATE{groups = Groups0} = S0, Eff0},
{VirtualHost, Stream, ConsumerName}) ->
case lookup_group(VirtualHost,
Stream,
ConsumerName,
Groups0) of
undefined ->
{S0, Eff0};
#group{consumers = Consumers0} = G0 ->
%% remove the connection consumers from the group state
%% keep flags to know what happened
{Consumers1, ActiveRemoved, AnyRemoved} =
lists:foldl(
fun(#consumer{pid = P, active = S}, {L, ActiveFlag, _}) when P == Pid ->
{L, S or ActiveFlag, true};
(C, {L, ActiveFlag, AnyFlag}) ->
{L ++ [C], ActiveFlag, AnyFlag}
end, {[], false, false}, Consumers0),
case AnyRemoved of
true ->
G1 = G0#group{consumers = Consumers1},
{G2, Effects} = handle_consumer_removal(G1, Stream, ConsumerName, ActiveRemoved),
Groups1 = update_groups(VirtualHost,
Stream,
ConsumerName,
G2,
Groups0),
{S0#?STATE{groups = Groups1}, Effects ++ Eff0};
false ->
{S0, Eff0}
end
end.
-spec state_to_map(state()) -> map().
state_to_map(#?STATE{groups = Groups, pids_groups = PidsGroups}) ->
#{<<"groups">> => groups_to_map(Groups),
<<"pids_groups">> => pids_groups_to_map(PidsGroups)}.
groups_to_map(Groups) when is_map(Groups) ->
maps:fold(fun(K, V, Acc) ->
Acc#{K => group_to_map(V)}
end, #{}, Groups).
pids_groups_to_map(PidsGroups) when is_map(PidsGroups) ->
PidsGroups.
group_to_map(#group{consumers = Consumers, partition_index = Index}) ->
OutConsumers = lists:foldl(fun(C, Acc) ->
Acc ++ [consumer_to_map(C)]
end, [], Consumers),
#{<<"consumers">> => OutConsumers, <<"partition_index">> => Index}.
consumer_to_map(#consumer{pid = Pid, subscription_id = SubId,
owner = Owner, active = Active}) ->
#{<<"pid">> => Pid, <<"subscription_id">> => SubId,
<<"owner">> => Owner, <<"active">> => Active}.
do_register_consumer(VirtualHost,
Stream,
-1 = _PartitionIndex,
ConsumerName,
ConnectionPid,
Owner,
SubscriptionId,
#?STATE{groups = StreamGroups0} = State) ->
Group0 =
lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0),
Consumer =
case lookup_active_consumer(Group0) of
{value, _} ->
#consumer{pid = ConnectionPid,
owner = Owner,
subscription_id = SubscriptionId,
active = false};
false ->
#consumer{pid = ConnectionPid,
subscription_id = SubscriptionId,
owner = Owner,
active = true}
end,
Group1 = add_to_group(Consumer, Group0),
StreamGroups1 =
update_groups(VirtualHost,
Stream,
ConsumerName,
Group1,
StreamGroups0),
#consumer{active = Active} = Consumer,
Effects =
case Active of
true ->
[notify_consumer_effect(ConnectionPid, SubscriptionId,
Stream, ConsumerName, Active)];
_ ->
[]
end,
{State#?STATE{groups = StreamGroups1}, {ok, Active}, Effects};
do_register_consumer(VirtualHost,
Stream,
_PartitionIndex,
ConsumerName,
ConnectionPid,
Owner,
SubscriptionId,
#?STATE{groups = StreamGroups0} = State) ->
Group0 =
lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0),
{Group1, Effects} =
case Group0 of
#group{consumers = []} ->
%% first consumer in the group, it's the active one
Consumer0 =
#consumer{pid = ConnectionPid,
owner = Owner,
subscription_id = SubscriptionId,
active = true},
G1 = add_to_group(Consumer0, Group0),
{G1,
[notify_consumer_effect(ConnectionPid, SubscriptionId,
Stream, ConsumerName, true)]};
_G ->
%% whatever the current state is, the newcomer will be passive
Consumer0 =
#consumer{pid = ConnectionPid,
owner = Owner,
subscription_id = SubscriptionId,
active = false},
G1 = add_to_group(Consumer0, Group0),
case lookup_active_consumer(G1) of
{value,
#consumer{pid = ActPid, subscription_id = ActSubId} =
CurrentActive} ->
case evaluate_active_consumer(G1) of
CurrentActive ->
%% the current active stays the same
{G1, []};
_ ->
%% there's a change, telling the active it's not longer active
{update_consumer_state_in_group(G1,
ActPid,
ActSubId,
false),
[notify_consumer_effect(ActPid,
ActSubId,
Stream,
ConsumerName,
false,
true)]}
end;
false ->
%% no active consumer in the (non-empty) group,
%% we are waiting for the reply of a former active
{G1, []}
end
end,
StreamGroups1 =
update_groups(VirtualHost,
Stream,
ConsumerName,
Group1,
StreamGroups0),
{value, #consumer{active = Active}} =
lookup_consumer(ConnectionPid, SubscriptionId, Group1),
{State#?STATE{groups = StreamGroups1}, {ok, Active}, Effects}.
handle_consumer_removal(#group{consumers = []} = G, _, _, _) ->
{G, []};
handle_consumer_removal(#group{partition_index = -1} = Group0,
Stream, ConsumerName, ActiveRemoved) ->
case ActiveRemoved of
true ->
%% this is the active consumer we remove, computing the new one
Group1 = compute_active_consumer(Group0),
case lookup_active_consumer(Group1) of
{value, #consumer{pid = Pid, subscription_id = SubId}} ->
%% creating the side effect to notify the new active consumer
{Group1, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]};
_ ->
%% no active consumer found in the group, nothing to do
{Group1, []}
end;
false ->
%% not the active consumer, nothing to do.
{Group0, []}
end;
handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
case lookup_active_consumer(Group0) of
{value,
#consumer{pid = ActPid, subscription_id = ActSubId} =
CurrentActive} ->
case evaluate_active_consumer(Group0) of
CurrentActive ->
%% the current active stays the same
{Group0, []};
_ ->
%% there's a change, telling the active it's not longer active
{update_consumer_state_in_group(Group0,
ActPid,
ActSubId,
false),
[notify_consumer_effect(ActPid, ActSubId,
Stream, ConsumerName, false, true)]}
end;
false ->
case ActiveRemoved of
true ->
%% the active one is going away, picking a new one
#consumer{pid = P, subscription_id = SID} =
evaluate_active_consumer(Group0),
{update_consumer_state_in_group(Group0, P, SID, true),
[notify_consumer_effect(P, SID,
Stream, ConsumerName, true)]};
false ->
%% no active consumer in the (non-empty) group,
%% we are waiting for the reply of a former active
{Group0, []}
end
end.
notify_consumer_effect(Pid, SubId, Stream, Name, Active) ->
notify_consumer_effect(Pid, SubId, Stream, Name, Active, false).
notify_consumer_effect(Pid, SubId, Stream, Name, Active, false = _SteppingDown) ->
mod_call_effect(Pid,
{sac, #{subscription_id => SubId,
stream => Stream,
consumer_name => Name,
active => Active}});
notify_consumer_effect(Pid, SubId, Stream, Name, Active, true = SteppingDown) ->
mod_call_effect(Pid,
{sac, #{subscription_id => SubId,
stream => Stream,
consumer_name => Name,
active => Active,
stepping_down => SteppingDown}}).
maybe_create_group(VirtualHost,
Stream,
PartitionIndex,
ConsumerName,
StreamGroups) ->
case StreamGroups of
#{{VirtualHost, Stream, ConsumerName} := _Group} ->
StreamGroups;
SGS ->
maps:put({VirtualHost, Stream, ConsumerName},
#group{consumers = [], partition_index = PartitionIndex},
SGS)
end.
lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups) ->
maps:get({VirtualHost, Stream, ConsumerName}, StreamGroups,
undefined).
add_to_group(Consumer, #group{consumers = Consumers} = Group) ->
Group#group{consumers = Consumers ++ [Consumer]}.
remove_from_group(Consumer, #group{consumers = Consumers} = Group) ->
Group#group{consumers = lists:delete(Consumer, Consumers)}.
has_consumers_from_pid(#group{consumers = Consumers}, Pid) ->
lists:any(fun (#consumer{pid = P}) when P == Pid ->
true;
(_) ->
false
end,
Consumers).
compute_active_consumer(#group{consumers = Crs,
partition_index = -1} =
Group)
when length(Crs) == 0 ->
Group;
compute_active_consumer(#group{partition_index = -1,
consumers = [Consumer0]} =
Group0) ->
Consumer1 = Consumer0#consumer{active = true},
Group0#group{consumers = [Consumer1]};
compute_active_consumer(#group{partition_index = -1,
consumers = [Consumer0 | T]} =
Group0) ->
Consumer1 = Consumer0#consumer{active = true},
Consumers = lists:map(fun(C) -> C#consumer{active = false} end, T),
Group0#group{consumers = [Consumer1] ++ Consumers}.
evaluate_active_consumer(#group{partition_index = PartitionIndex,
consumers = Consumers})
when PartitionIndex >= 0 ->
ActiveConsumerIndex = PartitionIndex rem length(Consumers),
lists:nth(ActiveConsumerIndex + 1, Consumers).
lookup_consumer(ConnectionPid, SubscriptionId,
#group{consumers = Consumers}) ->
lists:search(fun(#consumer{pid = ConnPid, subscription_id = SubId}) ->
ConnPid == ConnectionPid andalso SubId == SubscriptionId
end,
Consumers).
lookup_active_consumer(#group{consumers = Consumers}) ->
lists:search(fun(#consumer{active = Active}) -> Active end,
Consumers).
update_groups(_VirtualHost,
_Stream,
_ConsumerName,
undefined,
StreamGroups) ->
StreamGroups;
update_groups(VirtualHost,
Stream,
ConsumerName,
#group{consumers = []},
StreamGroups) ->
%% the group is now empty, removing the key
maps:remove({VirtualHost, Stream, ConsumerName}, StreamGroups);
update_groups(VirtualHost,
Stream,
ConsumerName,
Group,
StreamGroups) ->
maps:put({VirtualHost, Stream, ConsumerName}, Group, StreamGroups).
update_consumer_state_in_group(#group{consumers = Consumers0} = G,
Pid,
SubId,
NewState) ->
CS1 = lists:map(fun(C0) ->
case C0 of
#consumer{pid = Pid, subscription_id = SubId} ->
C0#consumer{active = NewState};
C -> C
end
end,
Consumers0),
G#group{consumers = CS1}.
mod_call_effect(Pid, Msg) ->
{mod_call, rabbit_stream_sac_coordinator, send_message, [Pid, Msg]}.
-spec send_message(pid(), term()) -> ok.
send_message(ConnectionPid, Msg) ->
ConnectionPid ! Msg,
ok.

View File

@ -0,0 +1,58 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 2.0 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at https://www.mozilla.org/en-US/MPL/2.0/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-type vhost() :: binary().
-type partition_index() :: integer().
-type stream() :: binary().
-type consumer_name() :: binary().
-type connection_pid() :: pid().
-type subscription_id() :: byte().
-type group_id() :: {vhost(), stream(), consumer_name()}.
-type owner() :: binary().
-record(consumer,
{pid :: pid(),
subscription_id :: subscription_id(),
owner :: owner(), %% just a label
active :: boolean()}).
-record(group,
{consumers :: [#consumer{}], partition_index :: integer()}).
-record(rabbit_stream_sac_coordinator,
{groups :: #{group_id() => #group{}},
pids_groups ::
#{connection_pid() =>
#{group_id() => true}}, %% inner map acts as a set
%% future extensibility
reserved_1,
reserved_2}).
%% commands
-record(command_register_consumer,
{vhost :: vhost(),
stream :: stream(),
partition_index :: partition_index(),
consumer_name :: consumer_name(),
connection_pid :: connection_pid(),
owner :: owner(),
subscription_id :: subscription_id()}).
-record(command_unregister_consumer,
{vhost :: vhost(),
stream :: stream(),
consumer_name :: consumer_name(),
connection_pid :: connection_pid(),
subscription_id :: subscription_id()}).
-record(command_activate_consumer,
{vhost :: vhost(), stream :: stream(),
consumer_name :: consumer_name()}).

View File

@ -1363,7 +1363,7 @@ delete_replica_leader(_) ->
ok. ok.
overview(_Config) -> overview(_Config) ->
S0 = rabbit_stream_coordinator:init(undefined), S0 = rabbit_stream_coordinator:init(#{machine_version => 5}),
O0 = rabbit_stream_coordinator:overview(S0), O0 = rabbit_stream_coordinator:overview(S0),
?assertMatch(#{num_monitors := 0, ?assertMatch(#{num_monitors := 0,
num_streams := 0, num_streams := 0,

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,593 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 2.0 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at https://www.mozilla.org/en-US/MPL/2.0/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_stream_sac_coordinator_v4_SUITE).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbit/src/rabbit_stream_sac_coordinator_v4.hrl").
%%%===================================================================
%%% Common Test callbacks
%%%===================================================================
-define(STATE, rabbit_stream_sac_coordinator).
-define(MOD, rabbit_stream_sac_coordinator_v4).
all() ->
[{group, tests}].
%% replicate eunit like test resolution
all_tests() ->
[F
|| {F, _} <- ?MODULE:module_info(functions),
re:run(atom_to_list(F), "_test$") /= nomatch].
groups() ->
[{tests, [], all_tests()}].
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
ok.
init_per_group(_Group, Config) ->
Config.
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(_TestCase, Config) ->
ok = meck:new(rabbit_feature_flags),
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> true end),
Config.
end_per_testcase(_TestCase, _Config) ->
meck:unload(),
ok.
simple_sac_test(_) ->
Stream = <<"stream">>,
ConsumerName = <<"app">>,
ConnectionPid = self(),
GroupId = {<<"/">>, Stream, ConsumerName},
Command0 =
register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 0),
State0 = state(),
{#?STATE{groups = #{GroupId := #group{consumers = Consumers1}}} =
State1,
{ok, Active1}, Effects1} =
?MOD:apply(Command0, State0),
?assert(Active1),
?assertEqual([consumer(ConnectionPid, 0, true)], Consumers1),
assertSendMessageEffect(ConnectionPid, 0, Stream, ConsumerName, true, Effects1),
Command1 =
register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 1),
{#?STATE{groups = #{GroupId := #group{consumers = Consumers2}}} =
State2,
{ok, Active2}, Effects2} =
?MOD:apply(Command1, State1),
?assertNot(Active2),
?assertEqual([consumer(ConnectionPid, 0, true),
consumer(ConnectionPid, 1, false)],
Consumers2),
assertEmpty(Effects2),
Command2 =
register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 2),
{#?STATE{groups = #{GroupId := #group{consumers = Consumers3}}} =
State3,
{ok, Active3}, Effects3} =
?MOD:apply(Command2, State2),
?assertNot(Active3),
?assertEqual([consumer(ConnectionPid, 0, true),
consumer(ConnectionPid, 1, false),
consumer(ConnectionPid, 2, false)],
Consumers3),
assertEmpty(Effects3),
Command3 =
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 0),
{#?STATE{groups = #{GroupId := #group{consumers = Consumers4}}} =
State4,
ok, Effects4} =
?MOD:apply(Command3, State3),
?assertEqual([consumer(ConnectionPid, 1, true),
consumer(ConnectionPid, 2, false)],
Consumers4),
assertSendMessageEffect(ConnectionPid, 1, Stream, ConsumerName, true, Effects4),
Command4 =
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 1),
{#?STATE{groups = #{GroupId := #group{consumers = Consumers5}}} =
State5,
ok, Effects5} =
?MOD:apply(Command4, State4),
?assertEqual([consumer(ConnectionPid, 2, true)], Consumers5),
assertSendMessageEffect(ConnectionPid, 2, Stream, ConsumerName, true, Effects5),
Command5 =
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 2),
{#?STATE{groups = Groups6}, ok, Effects6} =
?MOD:apply(Command5, State5),
assertEmpty(Groups6),
assertEmpty(Effects6),
ok.
super_stream_partition_sac_test(_) ->
Stream = <<"stream">>,
ConsumerName = <<"app">>,
ConnectionPid = self(),
GroupId = {<<"/">>, Stream, ConsumerName},
Command0 =
register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 0),
State0 = state(),
{#?STATE{groups = #{GroupId := #group{consumers = Consumers1}}} =
State1,
{ok, Active1}, Effects1} =
?MOD:apply(Command0, State0),
?assert(Active1),
?assertEqual([consumer(ConnectionPid, 0, true)], Consumers1),
assertSendMessageEffect(ConnectionPid, 0, Stream, ConsumerName, true, Effects1),
Command1 =
register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 1),
{#?STATE{groups = #{GroupId := #group{consumers = Consumers2}}} =
State2,
{ok, Active2}, Effects2} =
?MOD:apply(Command1, State1),
%% never active on registration
?assertNot(Active2),
%% all consumers inactive, until the former active one steps down and activates the new consumer
?assertEqual([consumer(ConnectionPid, 0, false),
consumer(ConnectionPid, 1, false)],
Consumers2),
assertSendMessageSteppingDownEffect(ConnectionPid, 0, Stream, ConsumerName, Effects2),
Command2 = activate_consumer_command(Stream, ConsumerName),
{#?STATE{groups = #{GroupId := #group{consumers = Consumers3}}} =
State3,
ok, Effects3} =
?MOD:apply(Command2, State2),
%% 1 (partition index) % 2 (consumer count) = 1 (active consumer index)
?assertEqual([consumer(ConnectionPid, 0, false),
consumer(ConnectionPid, 1, true)],
Consumers3),
assertSendMessageEffect(ConnectionPid, 1, Stream, ConsumerName, true, Effects3),
Command3 =
register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 2),
{#?STATE{groups = #{GroupId := #group{consumers = Consumers4}}} =
State4,
{ok, Active4}, Effects4} =
?MOD:apply(Command3, State3),
%% never active on registration
?assertNot(Active4),
%% 1 (partition index) % 3 (consumer count) = 1 (active consumer index)
%% the active consumer stays the same
?assertEqual([consumer(ConnectionPid, 0, false),
consumer(ConnectionPid, 1, true),
consumer(ConnectionPid, 2, false)],
Consumers4),
assertEmpty(Effects4),
Command4 =
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 0),
{#?STATE{groups = #{GroupId := #group{consumers = Consumers5}}} =
State5,
ok, Effects5} =
?MOD:apply(Command4, State4),
%% 1 (partition index) % 2 (consumer count) = 1 (active consumer index)
%% the active consumer will move from sub 1 to sub 2
?assertEqual([consumer(ConnectionPid, 1, false),
consumer(ConnectionPid, 2, false)],
Consumers5),
assertSendMessageSteppingDownEffect(ConnectionPid, 1, Stream, ConsumerName, Effects5),
Command5 = activate_consumer_command(Stream, ConsumerName),
{#?STATE{groups = #{GroupId := #group{consumers = Consumers6}}} =
State6,
ok, Effects6} =
?MOD:apply(Command5, State5),
?assertEqual([consumer(ConnectionPid, 1, false),
consumer(ConnectionPid, 2, true)],
Consumers6),
assertSendMessageEffect(ConnectionPid, 2, Stream, ConsumerName, true, Effects6),
Command6 =
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 1),
{#?STATE{groups = #{GroupId := #group{consumers = Consumers7}}} =
State7,
ok, Effects7} =
?MOD:apply(Command6, State6),
?assertEqual([consumer(ConnectionPid, 2, true)], Consumers7),
assertEmpty(Effects7),
Command7 =
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 2),
{#?STATE{groups = Groups8}, ok, Effects8} =
?MOD:apply(Command7, State7),
assertEmpty(Groups8),
assertEmpty(Effects8),
ok.
ensure_monitors_test(_) ->
GroupId = {<<"/">>, <<"stream">>, <<"app">>},
Group =
cgroup([consumer(self(), 0, true), consumer(self(), 1, false)]),
State0 = state(#{GroupId => Group}),
Monitors0 = #{},
Command0 =
register_consumer_command(<<"stream">>, -1, <<"app">>, self(), 0),
{#?STATE{pids_groups = PidsGroups1} = State1, Monitors1, Effects1} =
?MOD:ensure_monitors(Command0,
State0,
Monitors0,
[]),
assertSize(1, PidsGroups1),
assertSize(1, maps:get(self(), PidsGroups1)),
?assertEqual(#{self() => sac}, Monitors1),
?assertEqual([{monitor, process, self()}, {monitor, node, node()}],
Effects1),
Command1 =
register_consumer_command(<<"stream">>, -1, <<"app">>, self(), 1),
{#?STATE{pids_groups = PidsGroups2} = State2, Monitors2, Effects2} =
?MOD:ensure_monitors(Command1,
State1,
Monitors1,
[]),
assertSize(1, PidsGroups2),
assertSize(1, maps:get(self(), PidsGroups2)),
?assertEqual(#{self() => sac}, Monitors2),
?assertEqual([{monitor, process, self()}, {monitor, node, node()}],
Effects2),
Group2 = cgroup([consumer(self(), 1, true)]),
Command2 =
unregister_consumer_command(<<"stream">>, <<"app">>, self(), 0),
{#?STATE{pids_groups = PidsGroups3} = State3, Monitors3, Effects3} =
?MOD:ensure_monitors(Command2,
State2#?STATE{groups =
#{GroupId
=>
Group2}},
Monitors2,
[]),
assertSize(1, PidsGroups3),
assertSize(1, maps:get(self(), PidsGroups3)),
?assertEqual(#{self() => sac}, Monitors3),
?assertEqual([], Effects3),
%% trying with an unknown connection PID
%% the function should not change anything
UnknownConnectionPid = spawn(fun() -> ok end),
PassthroughCommand =
unregister_consumer_command(<<"stream">>,
<<"app">>,
UnknownConnectionPid,
0),
{State3, Monitors3, Effects3} =
?MOD:ensure_monitors(PassthroughCommand,
State3,
Monitors3,
[]),
Command3 =
unregister_consumer_command(<<"stream">>, <<"app">>, self(), 1),
{#?STATE{pids_groups = PidsGroups4} = _State4, Monitors4, Effects4} =
?MOD:ensure_monitors(Command3,
State3#?STATE{groups =
#{}},
Monitors3,
[]),
assertEmpty(PidsGroups4),
assertEmpty(Monitors4),
?assertEqual([{demonitor, process, self()}], Effects4),
ok.
handle_connection_down_sac_should_get_activated_test(_) ->
Stream = <<"stream">>,
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Group = cgroup([consumer(Pid0, 0, true),
consumer(Pid1, 1, false),
consumer(Pid0, 2, false)]),
State0 = state(#{GroupId => Group},
#{Pid0 => maps:from_list([{GroupId, true}]),
Pid1 => maps:from_list([{GroupId, true}])}),
{#?STATE{pids_groups = PidsGroups1, groups = Groups1} = State1,
Effects1} =
?MOD:handle_connection_down(Pid0, State0),
assertSize(1, PidsGroups1),
assertSize(1, maps:get(Pid1, PidsGroups1)),
assertSendMessageEffect(Pid1, 1, Stream, ConsumerName, true, Effects1),
assertHasGroup(GroupId, cgroup([consumer(Pid1, 1, true)]), Groups1),
{#?STATE{pids_groups = PidsGroups2, groups = Groups2},
Effects2} =
?MOD:handle_connection_down(Pid1, State1),
assertEmpty(PidsGroups2),
assertEmpty(Effects2),
assertEmpty(Groups2),
ok.
handle_connection_down_sac_active_does_not_change_test(_) ->
Stream = <<"stream">>,
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Group = cgroup([consumer(Pid1, 0, true),
consumer(Pid0, 1, false),
consumer(Pid0, 2, false)]),
State = state(#{GroupId => Group},
#{Pid0 => maps:from_list([{GroupId, true}]),
Pid1 => maps:from_list([{GroupId, true}])}),
{#?STATE{pids_groups = PidsGroups, groups = Groups},
Effects} =
?MOD:handle_connection_down(Pid0, State),
assertSize(1, PidsGroups),
assertSize(1, maps:get(Pid1, PidsGroups)),
assertEmpty(Effects),
assertHasGroup(GroupId, cgroup([consumer(Pid1, 0, true)]), Groups),
ok.
handle_connection_down_sac_no_more_consumers_test(_) ->
Stream = <<"stream">>,
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Group = cgroup([consumer(Pid0, 0, true),
consumer(Pid0, 1, false)]),
State = state(#{GroupId => Group},
#{Pid0 => maps:from_list([{GroupId, true}])}),
{#?STATE{pids_groups = PidsGroups, groups = Groups},
Effects} =
?MOD:handle_connection_down(Pid0, State),
assertEmpty(PidsGroups),
assertEmpty(Groups),
assertEmpty(Effects),
ok.
handle_connection_down_sac_no_consumers_in_down_connection_test(_) ->
Stream = <<"stream">>,
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Group = cgroup([consumer(Pid1, 0, true),
consumer(Pid1, 1, false)]),
State = state(#{GroupId => Group},
#{Pid0 => maps:from_list([{GroupId, true}]), %% should not be there
Pid1 => maps:from_list([{GroupId, true}])}),
{#?STATE{pids_groups = PidsGroups, groups = Groups},
Effects} =
?MOD:handle_connection_down(Pid0, State),
assertSize(1, PidsGroups),
assertSize(1, maps:get(Pid1, PidsGroups)),
assertEmpty(Effects),
assertHasGroup(GroupId, cgroup([consumer(Pid1, 0, true), consumer(Pid1, 1, false)]),
Groups),
ok.
handle_connection_down_super_stream_active_stays_test(_) ->
Stream = <<"stream">>,
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Group = cgroup(1, [consumer(Pid0, 0, false),
consumer(Pid0, 1, true),
consumer(Pid1, 2, false),
consumer(Pid1, 3, false)]),
State = state(#{GroupId => Group},
#{Pid0 => maps:from_list([{GroupId, true}]),
Pid1 => maps:from_list([{GroupId, true}])}),
{#?STATE{pids_groups = PidsGroups, groups = Groups},
Effects} =
?MOD:handle_connection_down(Pid1, State),
assertSize(1, PidsGroups),
assertSize(1, maps:get(Pid0, PidsGroups)),
assertEmpty(Effects),
assertHasGroup(GroupId, cgroup(1, [consumer(Pid0, 0, false), consumer(Pid0, 1, true)]),
Groups),
ok.
handle_connection_down_super_stream_active_changes_test(_) ->
Stream = <<"stream">>,
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Group = cgroup(1, [consumer(Pid0, 0, false),
consumer(Pid1, 1, true),
consumer(Pid0, 2, false),
consumer(Pid1, 3, false)]),
State = state(#{GroupId => Group},
#{Pid0 => maps:from_list([{GroupId, true}]),
Pid1 => maps:from_list([{GroupId, true}])}),
{#?STATE{pids_groups = PidsGroups, groups = Groups},
Effects} =
?MOD:handle_connection_down(Pid0, State),
assertSize(1, PidsGroups),
assertSize(1, maps:get(Pid1, PidsGroups)),
assertSendMessageSteppingDownEffect(Pid1, 1, Stream, ConsumerName, Effects),
assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 1, false), consumer(Pid1, 3, false)]),
Groups),
ok.
handle_connection_down_super_stream_activate_in_remaining_connection_test(_) ->
Stream = <<"stream">>,
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Group = cgroup(1, [consumer(Pid0, 0, false),
consumer(Pid0, 1, true),
consumer(Pid1, 2, false),
consumer(Pid1, 3, false)]),
State = state(#{GroupId => Group},
#{Pid0 => maps:from_list([{GroupId, true}]),
Pid1 => maps:from_list([{GroupId, true}])}),
{#?STATE{pids_groups = PidsGroups, groups = Groups},
Effects} =
?MOD:handle_connection_down(Pid0, State),
assertSize(1, PidsGroups),
assertSize(1, maps:get(Pid1, PidsGroups)),
assertSendMessageEffect(Pid1, 3, Stream, ConsumerName, true, Effects),
assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 2, false), consumer(Pid1, 3, true)]),
Groups),
ok.
handle_connection_down_super_stream_no_active_removed_or_present_test(_) ->
Stream = <<"stream">>,
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
%% this is a weird case that should not happen in the wild,
%% we test the logic in the code nevertheless.
%% No active consumer in the group
Group = cgroup(1, [consumer(Pid0, 0, false),
consumer(Pid0, 1, false),
consumer(Pid1, 2, false),
consumer(Pid1, 3, false)]),
State = state(#{GroupId => Group},
#{Pid0 => maps:from_list([{GroupId, true}]),
Pid1 => maps:from_list([{GroupId, true}])}),
{#?STATE{pids_groups = PidsGroups, groups = Groups},
Effects} =
?MOD:handle_connection_down(Pid0, State),
assertSize(1, PidsGroups),
assertSize(1, maps:get(Pid1, PidsGroups)),
assertEmpty(Effects),
assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 2, false), consumer(Pid1, 3, false)]),
Groups),
ok.
assertSize(Expected, []) ->
?assertEqual(Expected, 0);
assertSize(Expected, Map) when is_map(Map) ->
?assertEqual(Expected, maps:size(Map));
assertSize(Expected, List) when is_list(List) ->
?assertEqual(Expected, length(List)).
assertEmpty(Data) ->
assertSize(0, Data).
assertHasGroup(GroupId, Group, Groups) ->
?assertEqual(#{GroupId => Group}, Groups).
consumer(Pid, SubId, Active) ->
#consumer{pid = Pid,
subscription_id = SubId,
owner = <<"owning connection label">>,
active = Active}.
cgroup(Consumers) ->
cgroup(-1, Consumers).
cgroup(PartitionIndex, Consumers) ->
#group{partition_index = PartitionIndex, consumers = Consumers}.
state() ->
state(#{}).
state(Groups) ->
state(Groups, #{}).
state(Groups, PidsGroups) ->
#?STATE{groups = Groups, pids_groups = PidsGroups}.
register_consumer_command(Stream,
PartitionIndex,
ConsumerName,
ConnectionPid,
SubId) ->
#command_register_consumer{vhost = <<"/">>,
stream = Stream,
partition_index = PartitionIndex,
consumer_name = ConsumerName,
connection_pid = ConnectionPid,
owner = <<"owning connection label">>,
subscription_id = SubId}.
unregister_consumer_command(Stream,
ConsumerName,
ConnectionPid,
SubId) ->
#command_unregister_consumer{vhost = <<"/">>,
stream = Stream,
consumer_name = ConsumerName,
connection_pid = ConnectionPid,
subscription_id = SubId}.
activate_consumer_command(Stream, ConsumerName) ->
#command_activate_consumer{vhost = <<"/">>,
stream = Stream,
consumer_name = ConsumerName}.
assertSendMessageEffect(Pid, SubId, Stream, ConsumerName, Active, [Effect]) ->
?assertEqual({mod_call,
rabbit_stream_sac_coordinator,
send_message,
[Pid,
{sac,
#{subscription_id => SubId,
stream => Stream,
consumer_name => ConsumerName,
active => Active}
}]},
Effect).
assertSendMessageSteppingDownEffect(Pid, SubId, Stream, ConsumerName, [Effect]) ->
?assertEqual({mod_call,
rabbit_stream_sac_coordinator,
send_message,
[Pid,
{sac,
#{subscription_id => SubId,
stream => Stream,
consumer_name => ConsumerName,
active => false,
stepping_down => true}}]},
Effect).

View File

@ -18,6 +18,9 @@
connect(Config, Node) -> connect(Config, Node) ->
StreamPort = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_stream), StreamPort = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_stream),
connect(StreamPort).
connect(StreamPort) ->
{ok, Sock} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]), {ok, Sock} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0), C0 = rabbit_stream_core:init(0),
@ -71,8 +74,14 @@ delete_publisher(Sock, C0, PublisherId) ->
{{response, 1, {delete_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0), {{response, 1, {delete_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
{ok, C1}. {ok, C1}.
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) -> subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) ->
SubscribeFrame = rabbit_stream_core:frame({request, 1, {subscribe, SubscriptionId, Stream, _OffsetSpec = first, InitialCredit, _Props = #{}}}), subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, #{}).
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, Props) ->
Cmd = {subscribe, SubscriptionId, Stream, _OffsetSpec = first,
InitialCredit, Props},
SubscribeFrame = rabbit_stream_core:frame({request, 1, Cmd}),
ok = gen_tcp:send(Sock, SubscribeFrame), ok = gen_tcp:send(Sock, SubscribeFrame),
{{response, 1, {subscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0), {{response, 1, {subscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
{ok, C1}. {ok, C1}.

View File

@ -0,0 +1,77 @@
# Stream Coordinator
## Single Active Consumer
### "Simple" SAC (Not Super Stream)
```mermaid
sequenceDiagram
participant C as Coordinator
participant C1 as Connection 1
participant C2 as Connection 2
participant C3 as Connection 3
Note over C,C3: Simple SAC (not super stream)
C1->>C: register sub 1
C-)C1: {sac, sub 1, active = true}
activate C1
C1->>C1: consumer update to client
C2->>C: register sub 2
C3->>C: register sub 3
C1->>C: unregister sub 1
deactivate C1
C-)C2: {sac, sub 2, active = true}
activate C2
C2->>C2: consumer update to client
deactivate C2
```
### SAC with Super Stream Partition
```mermaid
sequenceDiagram
participant C as Coordinator
participant C1 as Connection 1
participant C2 as Connection 2
participant C3 as Connection 3
Note over C,C3: Super Stream SAC (partition = 1)
C1->>C: register sub 1
C-)C1: {sac, sub 1, active = true}
activate C1
C2->>C: register sub 2
C-)C1: {sac, sub 1, active = false, step down = true}
deactivate C1
C1->>C1: consumer update to client
C1->>C: activate consumer in group
C-)C2: {sac, sub 2, active = true}
activate C2
C2->>C2: consumer update to client
C3->>C: register sub 3
Note over C, C3: active consumer stays the same (partition % consumers = 1 % 3 = 1)
deactivate C2
```
### `noconnection` management
```mermaid
flowchart TB
A(monitor) --noconnection--> B(status = disconnected, set up timer)
B -. timeout .-> C(status = forgotten)
B -. nodeup .-> D(reissue monitors, send msg to connections)
D -. down .-> E(handle connection down)
D -. connection response .-> F(evaluate impacted groups)
```
* composite status for consumers: `{connected, active}`, `{disconnected,active}`, etc.
* `disconnected` status can prevent rebalancing in a group, e.g. `{disconnected, active}` (it is impossible to tell the active consumer to step down)
* consumers in `forgotten` status are ignored during rebalancing
* it may be necessary to reconcile a group if a `{forgotten, active}` consumer comes back in a group ("evaluate impacted groups" box above).
This is unlikely though.
### Stale Node Detection
```mermaid
flowchart TB
A(RA) -- tick --> B(stale nodes = RA known nodes - cluster nodes)
B -. no stale nodes .-> C(nothing to do)
B -. stale nodes .-> D(remove connections from state)
```

View File

@ -720,6 +720,9 @@ open(info, {OK, S, Data},
StatemData#statem_data{connection = Connection1, StatemData#statem_data{connection = Connection1,
connection_state = State2}} connection_state = State2}}
end; end;
open(info, {sac, check_connection, _}, State) ->
rabbit_stream_sac_coordinator:connection_reconnected(self()),
{keep_state, State};
open(info, open(info,
{sac, #{subscription_id := SubId, {sac, #{subscription_id := SubId,
active := Active} = Msg}, active := Active} = Msg},

View File

@ -378,7 +378,7 @@ list_consumer_groups_run(Config) ->
{ok, []} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts), {ok, []} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config), StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
{S, C} = start_stream_connection(StreamPort), {S, C0} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT), ?awaitMatch(1, connection_count(Config), ?WAIT),
ConsumerReference = <<"foo">>, ConsumerReference = <<"foo">>,
@ -387,11 +387,11 @@ list_consumer_groups_run(Config) ->
<<"name">> => ConsumerReference}, <<"name">> => ConsumerReference},
Stream1 = <<"list_consumer_groups_run_1">>, Stream1 = <<"list_consumer_groups_run_1">>,
create_stream(S, Stream1, C), C1 = create_stream(S, Stream1, C0),
subscribe(S, 0, Stream1, SubProperties, C), C2 = subscribe(S, 0, Stream1, SubProperties, C1),
handle_consumer_update(S, C, 0), C3 = handle_consumer_update(S, C2, 0),
subscribe(S, 1, Stream1, SubProperties, C), C4 = subscribe(S, 1, Stream1, SubProperties, C3),
subscribe(S, 2, Stream1, SubProperties, C), C5 = subscribe(S, 2, Stream1, SubProperties, C4),
?awaitMatch(3, consumer_count(Config), ?WAIT), ?awaitMatch(3, consumer_count(Config), ?WAIT),
@ -399,11 +399,11 @@ list_consumer_groups_run(Config) ->
assertConsumerGroup(Stream1, ConsumerReference, -1, 3, CG1), assertConsumerGroup(Stream1, ConsumerReference, -1, 3, CG1),
Stream2 = <<"list_consumer_groups_run_2">>, Stream2 = <<"list_consumer_groups_run_2">>,
create_stream(S, Stream2, C), C6 = create_stream(S, Stream2, C5),
subscribe(S, 3, Stream2, SubProperties, C), C7 = subscribe(S, 3, Stream2, SubProperties, C6),
handle_consumer_update(S, C, 3), C8 = handle_consumer_update(S, C7, 3),
subscribe(S, 4, Stream2, SubProperties, C), C9 = subscribe(S, 4, Stream2, SubProperties, C8),
subscribe(S, 5, Stream2, SubProperties, C), C10 = subscribe(S, 5, Stream2, SubProperties, C9),
?awaitMatch(3 + 3, consumer_count(Config), ?WAIT), ?awaitMatch(3 + 3, consumer_count(Config), ?WAIT),
@ -411,10 +411,10 @@ list_consumer_groups_run(Config) ->
assertConsumerGroup(Stream1, ConsumerReference, -1, 3, CG1), assertConsumerGroup(Stream1, ConsumerReference, -1, 3, CG1),
assertConsumerGroup(Stream2, ConsumerReference, -1, 3, CG2), assertConsumerGroup(Stream2, ConsumerReference, -1, 3, CG2),
delete_stream(S, Stream1, C), C11 = delete_stream(S, Stream1, C10),
delete_stream(S, Stream2, C), C12 = delete_stream(S, Stream2, C11),
close(S, C), close(S, C12),
{ok, []} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts), {ok, []} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
ok. ok.
@ -490,9 +490,9 @@ list_group_consumers_run(Config) ->
{ok, Consumers1} = {ok, Consumers1} =
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup1), ?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup1),
?assertEqual([[{subscription_id, 0}, {state, active}], ?assertEqual([[{subscription_id, 0}, {state, "active (connected)"}],
[{subscription_id, 1}, {state, inactive}], [{subscription_id, 1}, {state, "waiting (connected)"}],
[{subscription_id, 2}, {state, inactive}]], [{subscription_id, 2}, {state, "waiting (connected)"}]],
Consumers1), Consumers1),
Stream2 = <<"list_group_consumers_run_2">>, Stream2 = <<"list_group_consumers_run_2">>,
@ -510,9 +510,9 @@ list_group_consumers_run(Config) ->
{ok, Consumers2} = {ok, Consumers2} =
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup2), ?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup2),
?assertEqual([[{subscription_id, 3}, {state, active}], ?assertEqual([[{subscription_id, 3}, {state, "active (connected)"}],
[{subscription_id, 4}, {state, inactive}], [{subscription_id, 4}, {state, "waiting (connected)"}],
[{subscription_id, 5}, {state, inactive}]], [{subscription_id, 5}, {state, "waiting (connected)"}]],
Consumers2), Consumers2),
delete_stream(S, Stream1, C), delete_stream(S, Stream1, C),

View File

@ -596,35 +596,23 @@ max_segment_size_bytes_validation(Config) ->
ok. ok.
close_connection_on_consumer_update_timeout(Config) -> close_connection_on_consumer_update_timeout(Config) ->
Transport = gen_tcp,
Port = get_stream_port(Config),
{ok, S} =
Transport:connect("localhost", Port,
[{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
C2 = test_authenticate(Transport, S, C1),
Stream = atom_to_binary(?FUNCTION_NAME, utf8), Stream = atom_to_binary(?FUNCTION_NAME, utf8),
C3 = test_create_stream(Transport, S, Stream, C2), {ok, S, C0} = stream_test_utils:connect(Config, 0),
{ok, C1} = stream_test_utils:create_stream(S, C0, Stream),
SubId = 42, SubId = 42,
C4 = test_subscribe(Transport, S, SubId, Stream, Props = #{<<"single-active-consumer">> => <<"true">>,
#{<<"single-active-consumer">> => <<"true">>, <<"name">> => <<"foo">>},
<<"name">> => <<"foo">>}, {ok, C2} = stream_test_utils:subscribe(S, C1, Stream, SubId, 10, Props),
?RESPONSE_CODE_OK,
C3), {Cmd, _C3} = receive_commands(S, C2),
{Cmd, _C5} = receive_commands(Transport, S, C4),
?assertMatch({request, _, {consumer_update, SubId, true}}, Cmd), ?assertMatch({request, _, {consumer_update, SubId, true}}, Cmd),
closed = wait_for_socket_close(Transport, S, 10), closed = wait_for_socket_close(S, 10),
{ok, Sb} =
Transport:connect("localhost", Port, {ok, Sb, Cb0} = stream_test_utils:connect(Config, 0),
[{active, false}, {mode, binary}]), {ok, Cb1} = stream_test_utils:delete_stream(Sb, Cb0, Stream),
Cb0 = rabbit_stream_core:init(0), stream_test_utils:close(Sb, Cb1),
Cb1 = test_peer_properties(Transport, Sb, Cb0), closed = wait_for_socket_close(Sb, 10),
Cb2 = test_authenticate(Transport, Sb, Cb1),
Cb3 = test_delete_stream(Transport, Sb, Stream, Cb2, false),
_Cb4 = test_close(Transport, Sb, Cb3),
closed = wait_for_socket_close(Transport, Sb, 10),
ok. ok.
set_filter_size(Config) -> set_filter_size(Config) ->
@ -1606,6 +1594,9 @@ test_close(Transport, S, C0) ->
receive_commands(Transport, S, C0), receive_commands(Transport, S, C0),
C. C.
wait_for_socket_close(S, Attempt) ->
wait_for_socket_close(gen_tcp, S, Attempt).
wait_for_socket_close(_Transport, _S, 0) -> wait_for_socket_close(_Transport, _S, 0) ->
not_closed; not_closed;
wait_for_socket_close(Transport, S, Attempt) -> wait_for_socket_close(Transport, S, Attempt) ->
@ -1616,6 +1607,10 @@ wait_for_socket_close(Transport, S, Attempt) ->
closed closed
end. end.
receive_commands(S, C) ->
receive_commands(gen_tcp, S, C).
receive_commands(Transport, S, C) -> receive_commands(Transport, S, C) ->
stream_test_utils:receive_stream_commands(Transport, S, C). stream_test_utils:receive_stream_commands(Transport, S, C).

View File

@ -0,0 +1,786 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 2.0 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at https://www.mozilla.org/en-US/MPL/2.0/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2025 Broadcom. All Rights Reserved.
%% The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_stream_partitions_SUITE).
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
-include_lib("rabbit/src/rabbit_stream_sac_coordinator.hrl").
-compile(nowarn_export_all).
-compile(export_all).
-define(NET_TICKTIME_S, 5).
-define(TRSPT, gen_tcp).
-define(CORR_ID, 1).
-define(SAC_STATE, rabbit_stream_sac_coordinator).
-record(node, {name :: node(), stream_port :: pos_integer()}).
all() ->
[{group, cluster}].
groups() ->
[{cluster, [],
[simple_sac_consumer_should_get_disconnected_on_network_partition,
simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition,
super_stream_sac_consumer_should_get_disconnected_on_network_partition,
super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partition]}
].
init_per_suite(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip, "mixed version clusters are not supported"};
_ ->
rabbit_ct_helpers:log_environment(),
Config
end.
end_per_suite(Config) ->
Config.
init_per_group(Group, Config) ->
Config1 = rabbit_ct_helpers:run_setup_steps(
Config,
[fun rabbit_ct_broker_helpers:configure_dist_proxy/1]),
rabbit_ct_helpers:set_config(Config1,
[{rmq_nodename_suffix, Group},
{net_ticktime, ?NET_TICKTIME_S}]).
end_per_group(_, Config) ->
Config.
init_per_testcase(TestCase, Config) ->
Config1 = rabbit_ct_helpers:testcase_started(Config, TestCase),
Config2 = rabbit_ct_helpers:set_config(
Config1, [{rmq_nodes_clustered, true},
{rmq_nodes_count, 3},
{tcp_ports_base}
]),
rabbit_ct_helpers:run_setup_steps(
Config2,
[fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
{aten,
[{poll_interval,
1000}]})
end,
fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
{rabbit,
[{stream_cmd_timeout, 5000},
{stream_sac_disconnected_timeout,
2000}]})
end]
++ rabbit_ct_broker_helpers:setup_steps()).
end_per_testcase(TestCase, Config) ->
Config1 = rabbit_ct_helpers:testcase_finished(Config, TestCase),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:teardown_steps()).
simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
init_coordinator(Config),
CL = coordinator_leader(Config),
S = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
init_stream(Config, CL, S),
[L, F1, F2] = topology(Config, S),
%% the stream leader and the coordinator leader are on the same node
%% another node will be isolated
?assertEqual(L#node.name, coordinator_leader(Config)),
{ok, So0, C0_00} = stream_test_utils:connect(Config, 0),
{ok, So1, C1_00} = stream_test_utils:connect(Config, 1),
{ok, So2, C2_00} = stream_test_utils:connect(Config, 2),
C0_01 = register_sac(So0, C0_00, S, 0),
C0_02 = receive_consumer_update(So0, C0_01),
C1_01 = register_sac(So1, C1_00, S, 1),
C2_01 = register_sac(So2, C2_00, S, 2),
SubIdToState0 = #{0 => {So0, C0_02},
1 => {So1, C1_01},
2 => {So2, C2_01}},
Consumers1 = query_consumers(Config, S),
assertSize(3, Consumers1),
assertConsumersConnected(Consumers1),
LN = L#node.name,
F1N = F1#node.name,
F2N = F2#node.name,
Isolated = F1N,
{value, DisconnectedConsumer} =
lists:search(fun(#consumer{pid = ConnPid}) ->
rpc(Config, erlang, node, [ConnPid]) =:= Isolated
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N),
wait_for_disconnected_consumer(Config, LN, S),
wait_for_presumed_down_consumer(Config, LN, S),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N),
wait_for_all_consumers_connected(Config, LN, S),
Consumers2 = query_consumers(Config, LN, S),
%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
assertSize(2, Consumers2),
assertConsumersConnected(Consumers2),
?assertMatch([DisconnectedConsumer],
Consumers1 -- Consumers2),
%% assert the cancelled consumer received a metadata update frame
SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
C1 = receive_metadata_update(S0, C0),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),
delete_stream(stream_port(Config, 0), S),
%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue the this frame before closing the connection
%% directly closing the connection of the cancelled consumer
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
{_, C1} = receive_commands(S0, C0),
{ok, _} = stream_test_utils:close(S0, C1);
(_, {S0, C0}) ->
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),
ok.
simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Config) ->
init_coordinator(Config),
CL = coordinator_leader(Config),
[CF1, CF2] = all_nodes(Config) -- [CL],
S = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
init_stream(Config, CF1, S),
[L, _F1, _F2] = topology(Config, S),
%% the stream leader and the coordinator leader are not on the same node
%% the coordinator leader node will be isolated
?assertNotEqual(L#node.name, CL),
{ok, So0, C0_00} = stream_test_utils:connect(Config, CL),
{ok, So1, C1_00} = stream_test_utils:connect(Config, CF1),
{ok, So2, C2_00} = stream_test_utils:connect(Config, CF2),
C0_01 = register_sac(So0, C0_00, S, 0),
C0_02 = receive_consumer_update(So0, C0_01),
C1_01 = register_sac(So1, C1_00, S, 1),
C2_01 = register_sac(So2, C2_00, S, 2),
SubIdToState0 = #{0 => {So0, C0_02},
1 => {So1, C1_01},
2 => {So2, C2_01}},
Consumers1 = query_consumers(Config, S),
assertSize(3, Consumers1),
assertConsumersConnected(Consumers1),
%% N1 is the coordinator leader
Isolated = CL,
NotIsolated = CF1,
{value, DisconnectedConsumer} =
lists:search(fun(#consumer{pid = ConnPid}) ->
rpc(Config, erlang, node, [ConnPid]) =:= Isolated
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
rabbit_ct_broker_helpers:block_traffic_between(Isolated, CF1),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, CF2),
wait_for_disconnected_consumer(Config, NotIsolated, S),
wait_for_presumed_down_consumer(Config, NotIsolated, S),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, CF1),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, CF2),
wait_for_coordinator_ready(Config),
wait_for_all_consumers_connected(Config, NotIsolated, S),
Consumers2 = query_consumers(Config, NotIsolated, S),
%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
assertSize(2, Consumers2),
assertConsumersConnected(Consumers2),
assertEmpty(lists:filter(fun(C) ->
same_consumer(DisconnectedConsumer, C)
end, Consumers2)),
[#consumer{subscription_id = ActiveSubId}] =
lists:filter(fun(#consumer{status = St}) ->
St =:= {connected, active}
end, Consumers2),
SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
%% cancelled consumer received a metadata update
C1 = receive_metadata_update(S0, C0),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) when K == ActiveSubId ->
%% promoted consumer should have received consumer update
C1 = receive_consumer_update_and_respond(S0, C0),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),
delete_stream(L#node.stream_port, S),
%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue this frame before closing the connection
%% directly closing the connection of the cancelled consumer
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
{_, C1} = receive_commands(S0, C0),
{ok, _} = stream_test_utils:close(S0, C1);
(_, {S0, C0}) ->
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),
ok.
super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
init_coordinator(Config),
CL = coordinator_leader(Config),
Ss = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
Partition = init_super_stream(Config, CL, Ss, 1, CL),
[L, F1, F2] = topology(Config, Partition),
wait_for_coordinator_ready(Config),
%% we expect the stream leader and the coordinator leader to be on the same node
%% another node will be isolated
?assertEqual(L#node.name, CL),
{ok, So0, C0_00} = stream_test_utils:connect(L#node.stream_port),
{ok, So1, C1_00} = stream_test_utils:connect(F1#node.stream_port),
{ok, So2, C2_00} = stream_test_utils:connect(F2#node.stream_port),
C0_01 = register_sac(So0, C0_00, Partition, 0, Ss),
C0_02 = receive_consumer_update(So0, C0_01),
C1_01 = register_sac(So1, C1_00, Partition, 1, Ss),
C2_01 = register_sac(So2, C2_00, Partition, 2, Ss),
SubIdToState0 = #{0 => {So0, C0_02},
1 => {So1, C1_01},
2 => {So2, C2_01}},
Consumers1 = query_consumers(Config, Partition),
assertSize(3, Consumers1),
assertConsumersConnected(Consumers1),
LN = L#node.name,
F1N = F1#node.name,
F2N = F2#node.name,
Isolated = F1N,
NotIsolated = F2N,
{value, DisconnectedConsumer} =
lists:search(fun(#consumer{pid = ConnPid}) ->
rpc(Config, erlang, node, [ConnPid]) =:= Isolated
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N),
wait_for_disconnected_consumer(Config, NotIsolated, Partition),
wait_for_presumed_down_consumer(Config, NotIsolated, Partition),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N),
wait_for_coordinator_ready(Config),
wait_for_all_consumers_connected(Config, NotIsolated, Partition),
Consumers2 = query_consumers(Config, NotIsolated, Partition),
%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
assertSize(2, Consumers2),
assertConsumersConnected(Consumers2),
assertEmpty(lists:filter(fun(C) ->
same_consumer(DisconnectedConsumer, C)
end, Consumers2)),
SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
%% cancelled consumer received a metadata update
C1 = receive_metadata_update(S0, C0),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),
delete_super_stream(L#node.stream_port, Ss),
%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue this frame before closing the connection
%% directly closing the connection of the cancelled consumer
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
{_, C1} = receive_commands(S0, C0),
{ok, _} = stream_test_utils:close(S0, C1);
(_, {S0, C0}) ->
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),
ok.
super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Config) ->
init_coordinator(Config),
CL = coordinator_leader(Config),
[CF1, _] = all_nodes(Config) -- [CL],
Ss = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
Partition = init_super_stream(Config, CL, Ss, 2, CF1),
[L, F1, F2] = topology(Config, Partition),
wait_for_coordinator_ready(Config),
%% check stream leader and coordinator are not on the same node
%% the coordinator leader node will be isolated
?assertNotEqual(L#node.name, CL),
{ok, So0, C0_00} = stream_test_utils:connect(L#node.stream_port),
{ok, So1, C1_00} = stream_test_utils:connect(F1#node.stream_port),
{ok, So2, C2_00} = stream_test_utils:connect(F2#node.stream_port),
C0_01 = register_sac(So0, C0_00, Partition, 0, Ss),
C0_02 = receive_consumer_update(So0, C0_01),
C1_01 = register_sac(So1, C1_00, Partition, 1, Ss),
%% former active gets de-activated
C0_03 = receive_consumer_update_and_respond(So0, C0_02),
%% gets activated
C1_02 = receive_consumer_update_and_respond(So1, C1_01),
C2_01 = register_sac(So2, C2_00, Partition, 2, Ss),
SubIdToState0 = #{0 => {So0, C0_03},
1 => {So1, C1_02},
2 => {So2, C2_01}},
Consumers1 = query_consumers(Config, Partition),
assertSize(3, Consumers1),
assertConsumersConnected(Consumers1),
LN = L#node.name,
F1N = F1#node.name,
F2N = F2#node.name,
Isolated = F1N,
NotIsolated = F2N,
{value, DisconnectedConsumer} =
lists:search(fun(#consumer{pid = ConnPid}) ->
rpc(Config, erlang, node, [ConnPid]) =:= Isolated
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N),
wait_for_disconnected_consumer(Config, NotIsolated, Partition),
wait_for_presumed_down_consumer(Config, NotIsolated, Partition),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N),
wait_for_coordinator_ready(Config),
wait_for_all_consumers_connected(Config, NotIsolated, Partition),
Consumers2 = query_consumers(Config, NotIsolated, Partition),
%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
assertSize(2, Consumers2),
assertConsumersConnected(Consumers2),
assertEmpty(lists:filter(fun(C) ->
same_consumer(DisconnectedConsumer, C)
end, Consumers2)),
[#consumer{subscription_id = ActiveSubId}] =
lists:filter(fun(#consumer{status = St}) ->
St =:= {connected, active}
end, Consumers2),
SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
%% cancelled consumer received a metadata update
C1 = receive_metadata_update(S0, C0),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) when K == ActiveSubId ->
%% promoted consumer should have received consumer update
C1 = receive_consumer_update_and_respond(S0, C0),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),
delete_super_stream(L#node.stream_port, Ss),
%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue this frame before closing the connection
%% directly closing the connection of the cancelled consumer
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
{_, C1} = receive_commands(S0, C0),
{ok, _} = stream_test_utils:close(S0, C1);
(_, {S0, C0}) ->
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),
ok.
same_consumer(#consumer{owner = P1, subscription_id = Id1},
#consumer{owner = P2, subscription_id = Id2})
when P1 == P2 andalso Id1 == Id2 ->
true;
same_consumer(_, _) ->
false.
cluster_nodes(Config) ->
lists:map(fun(N) ->
#node{name = node_config(Config, N, nodename),
stream_port = stream_port(Config, N)}
end, lists:seq(0, node_count(Config) - 1)).
node_count(Config) ->
test_server:lookup_config(rmq_nodes_count, Config).
nodename(Config, N) ->
node_config(Config, N, nodename).
stream_port(Config, N) ->
node_config(Config, N, tcp_port_stream).
node_config(Config, N, K) ->
rabbit_ct_broker_helpers:get_node_config(Config, N, K).
topology(Config, St) ->
Members = stream_members(Config, St),
LN = leader(Members),
Nodes = cluster_nodes(Config),
[L] = lists:filter(fun(#node{name = N}) ->
N =:= LN
end, Nodes),
[F1, F2] = lists:filter(fun(#node{name = N}) ->
N =/= LN
end, Nodes),
[L, F1, F2].
leader(Members) ->
maps:fold(fun(Node, {_, writer}, _Acc) ->
Node;
(_, _, Acc) ->
Acc
end, undefined, Members).
stream_members(Config, Stream) ->
{ok, Q} = rpc(Config, rabbit_amqqueue, lookup, [Stream, <<"/">>]),
#{name := StreamId} = amqqueue:get_type_state(Q),
State = rpc(Config, rabbit_stream_coordinator, state, []),
{ok, Members} = rpc(Config, rabbit_stream_coordinator, query_members,
[StreamId, State]),
Members.
init_coordinator(Config) ->
%% to make sure the coordinator is initialized
init_stream(Config, 0, <<"dummy">>),
delete_stream(stream_port(Config, 0), <<"dummy">>),
wait_for_coordinator_ready(Config).
init_stream(Config, N, St) ->
{ok, S, C0} = stream_test_utils:connect(stream_port(Config, N)),
{ok, C1} = stream_test_utils:create_stream(S, C0, St),
NC = node_count(Config),
wait_for_members(S, C1, St, NC),
{ok, _} = stream_test_utils:close(S, C1).
delete_stream(Port, St) ->
{ok, S, C0} = stream_test_utils:connect(Port),
{ok, C1} = stream_test_utils:delete_stream(S, C0, St),
{ok, _} = stream_test_utils:close(S, C1).
init_super_stream(Config, Node, Ss, PartitionIndex, ExpectedNode) ->
{ok, S, C0} = stream_test_utils:connect(Config, Node),
NC = node_count(Config),
Partitions = [unicode:characters_to_binary([Ss, <<"-">>, integer_to_binary(N)])
|| N <- lists:seq(0, NC - 1)],
Bks = [integer_to_binary(N) || N <- lists:seq(0, NC - 1)],
SsCreationFrame = request({create_super_stream, Ss, Partitions, Bks, #{}}),
ok = ?TRSPT:send(S, SsCreationFrame),
{Cmd1, C1} = receive_commands(S, C0),
?assertMatch({response, ?CORR_ID, {create_super_stream, ?RESPONSE_CODE_OK}},
Cmd1),
[wait_for_members(S, C1, P, NC) || P <- Partitions],
Partition = lists:nth(PartitionIndex, Partitions),
[#node{name = LN} | _] = topology(Config, Partition),
P = case LN of
ExpectedNode ->
Partition;
_ ->
enforce_stream_leader_on_node(Config, S, C1,
Partitions, Partition,
ExpectedNode, 10)
end,
{ok, _} = stream_test_utils:close(S, C1),
P.
enforce_stream_leader_on_node(_, _, _, _, _, _, 0) ->
ct:fail("could not create super stream partition on chosen node");
enforce_stream_leader_on_node(Config, S, C,
Partitions, Partition, Node, Count) ->
CL = coordinator_leader(Config),
NC = node_count(Config),
[begin
case P of
Partition ->
restart_stream(Config, CL, P, Node);
_ ->
restart_stream(Config, CL, P, undefined)
end,
wait_for_members(S, C, P, NC)
end || P <- Partitions],
[#node{name = LN} | _] = topology(Config, Partition),
case LN of
Node ->
Partition;
_ ->
timer:sleep(500),
enforce_stream_leader_on_node(Config, S, C,
Partitions, Partition, Node,
Count - 1)
end.
delete_super_stream(Port, Ss) ->
{ok, S, C0} = stream_test_utils:connect(Port),
SsDeletionFrame = request({delete_super_stream, Ss}),
ok = ?TRSPT:send(S, SsDeletionFrame),
{Cmd1, C1} = receive_commands(S, C0),
?assertMatch({response, ?CORR_ID, {delete_super_stream, ?RESPONSE_CODE_OK}},
Cmd1),
{ok, _} = stream_test_utils:close(S, C1).
register_sac(S, C0, St, SubId, SuperStream) ->
register_sac0(S, C0, St, SubId, #{<<"super-stream">> => SuperStream}).
register_sac(S, C0, St, SubId) ->
register_sac0(S, C0, St, SubId, #{}).
register_sac0(S, C0, St, SubId, Args) ->
SacSubscribeFrame = request({subscribe, SubId, St,
first, 1,
Args#{<<"single-active-consumer">> => <<"true">>,
<<"name">> => name()}}),
ok = ?TRSPT:send(S, SacSubscribeFrame),
{Cmd1, C1} = receive_commands(S, C0),
?assertMatch({response, ?CORR_ID, {subscribe, ?RESPONSE_CODE_OK}},
Cmd1),
C1.
receive_consumer_update(S, C0) ->
{Cmd, C1} = receive_commands(S, C0),
?assertMatch({request, _CorrId, {consumer_update, _SubId, _Status}},
Cmd),
C1.
receive_consumer_update_and_respond(S, C0) ->
{Cmd, C1} = receive_commands(S, C0),
?assertMatch({request, _CorrId, {consumer_update, _SubId, _Status}},
Cmd),
{request, CorrId, {consumer_update, _SubId, _Status}} = Cmd,
Frame = response(CorrId, {consumer_update, ?RESPONSE_CODE_OK, first}),
ok = ?TRSPT:send(S, Frame),
C1.
receive_metadata_update(S, C0) ->
{Cmd, C1} = receive_commands(S, C0),
?assertMatch({metadata_update, _, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE},
Cmd),
C1.
unsubscribe(S, C0) ->
{ok, C1} = stream_test_utils:unsubscribe(S, C0, sub_id()),
C1.
query_consumers(Config, Stream) ->
query_consumers(Config, 0, Stream).
query_consumers(Config, Node, Stream) ->
Key = group_key(Stream),
#?SAC_STATE{groups = #{Key := #group{consumers = Consumers}}} =
rpc(Config, Node, rabbit_stream_coordinator, sac_state, []),
Consumers.
all_nodes(Config) ->
lists:map(fun(N) ->
nodename(Config, N)
end, lists:seq(0, node_count(Config) - 1)).
coordinator_status(Config) ->
rpc(Config, rabbit_stream_coordinator, status, []).
coordinator_leader(Config) ->
Status = coordinator_status(Config),
case lists:search(fun(St) ->
RS = proplists:get_value(<<"Raft State">>, St,
undefined),
RS == leader
end, Status) of
{value, Leader} ->
proplists:get_value(<<"Node Name">>, Leader, undefined);
_ ->
undefined
end.
restart_stream(Config, Node, S, undefined) ->
rpc(Config, Node, rabbit_stream_queue, restart_stream, [<<"/">>, S, #{}]);
restart_stream(Config, Node, S, Leader) ->
Opts = #{preferred_leader_node => Leader},
rpc(Config, Node, rabbit_stream_queue, restart_stream, [<<"/">>, S, Opts]).
rpc(Config, M, F, A) ->
rpc(Config, 0, M, F, A).
rpc(Config, Node, M, F, A) ->
rabbit_ct_broker_helpers:rpc(Config, Node, M, F, A).
group_key(Stream) ->
{<<"/">>, Stream, name()}.
request(Cmd) ->
request(?CORR_ID, Cmd).
request(CorrId, Cmd) ->
rabbit_stream_core:frame({request, CorrId, Cmd}).
response(CorrId, Cmd) ->
rabbit_stream_core:frame({response, CorrId, Cmd}).
receive_commands(S, C) ->
receive_commands(?TRSPT, S, C).
receive_commands(Transport, S, C) ->
stream_test_utils:receive_stream_commands(Transport, S, C).
sub_id() ->
0.
name() ->
<<"app">>.
wait_for_members(S, C, St, ExpectedCount) ->
T = ?TRSPT,
GetStreamNodes =
fun() ->
MetadataFrame = request({metadata, [St]}),
ok = gen_tcp:send(S, MetadataFrame),
{CmdMetadata, _} = receive_commands(T, S, C),
{response, 1,
{metadata, _Nodes, #{St := {Leader = {_H, _P}, Replicas}}}} =
CmdMetadata,
[Leader | Replicas]
end,
rabbit_ct_helpers:await_condition(fun() ->
length(GetStreamNodes()) == ExpectedCount
end).
wait_for_disconnected_consumer(Config, Node, Stream) ->
rabbit_ct_helpers:await_condition(
fun() ->
Cs = query_consumers(Config, Node, Stream),
lists:any(fun(#consumer{status = {disconnected, _}}) ->
true;
(_) ->
false
end, Cs)
end).
wait_for_presumed_down_consumer(Config, Node, Stream) ->
rabbit_ct_helpers:await_condition(
fun() ->
Cs = query_consumers(Config, Node, Stream),
lists:any(fun(#consumer{status = {presumed_down, _}}) ->
true;
(_) ->
false
end, Cs)
end).
wait_for_all_consumers_connected(Config, Node, Stream) ->
rabbit_ct_helpers:await_condition(
fun() ->
Cs = query_consumers(Config, Node, Stream),
lists:all(fun(#consumer{status = {connected, _}}) ->
true;
(_) ->
false
end, Cs)
end, 30_000).
wait_for_coordinator_ready(Config) ->
NC = node_count(Config),
rabbit_ct_helpers:await_condition(
fun() ->
Status = coordinator_status(Config),
lists:all(fun(St) ->
RS = proplists:get_value(<<"Raft State">>, St,
undefined),
RS == leader orelse RS == follower
end, Status) andalso length(Status) == NC
end).
assertConsumersConnected(Consumers) when length(Consumers) > 0 ->
lists:foreach(fun(#consumer{status = St}) ->
?assertMatch({connected, _}, St,
"Consumer should be connected")
end, Consumers);
assertConsumersConnected(_) ->
?assert(false, "The consumer list is empty").
assertSize(Expected, []) ->
?assertEqual(Expected, 0);
assertSize(Expected, Map) when is_map(Map) ->
?assertEqual(Expected, maps:size(Map));
assertSize(Expected, List) when is_list(List) ->
?assertEqual(Expected, length(List)).
assertEmpty(Data) ->
assertSize(0, Data).