rabbitmq-server/deps/rabbitmq_stream/test/rabbit_stream_partitions_SU...

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

901 lines
34 KiB
Erlang
Raw Permalink Normal View History

Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
%% The contents of this file are subject to the Mozilla Public License
%% Version 2.0 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at https://www.mozilla.org/en-US/MPL/2.0/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2025 Broadcom. All Rights Reserved.
%% The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
%%
-module(rabbit_stream_partitions_SUITE).
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
-include_lib("rabbit/src/rabbit_stream_sac_coordinator.hrl").
-compile(nowarn_export_all).
-compile(export_all).
-define(NET_TICKTIME_S, 5).
-define(TRSPT, gen_tcp).
-define(CORR_ID, 1).
-define(SAC_STATE, rabbit_stream_sac_coordinator).
-record(node, {name :: node(), stream_port :: pos_integer()}).
all() ->
[{group, cluster}].
groups() ->
[{cluster, [],
[simple_sac_consumer_should_get_disconnected_on_network_partition,
simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition,
super_stream_sac_consumer_should_get_disconnected_on_network_partition,
super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partition]}
].
init_per_suite(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip, "mixed version clusters are not supported"};
_ ->
rabbit_ct_helpers:log_environment(),
Config
end.
end_per_suite(Config) ->
Config.
init_per_group(Group, Config) ->
Config1 = rabbit_ct_helpers:run_setup_steps(
Config,
[fun rabbit_ct_broker_helpers:configure_dist_proxy/1]),
rabbit_ct_helpers:set_config(Config1,
[{rmq_nodename_suffix, Group},
{net_ticktime, ?NET_TICKTIME_S}]).
end_per_group(_, Config) ->
Config.
init_per_testcase(TestCase, Config) ->
Config1 = rabbit_ct_helpers:testcase_started(Config, TestCase),
Config2 = rabbit_ct_helpers:set_config(
Config1, [{rmq_nodes_clustered, true},
{rmq_nodes_count, 3},
{tcp_ports_base}
]),
rabbit_ct_helpers:run_setup_steps(
Config2,
[fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
{aten,
[{poll_interval,
1000}]})
end,
fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
{rabbit,
[{stream_cmd_timeout, 5000},
{stream_sac_disconnected_timeout,
2000}]})
end]
++ rabbit_ct_broker_helpers:setup_steps()).
end_per_testcase(TestCase, Config) ->
Config1 = rabbit_ct_helpers:testcase_finished(Config, TestCase),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:teardown_steps()).
simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
init_coordinator(Config),
CL = coordinator_leader(Config),
S = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
init_stream(Config, CL, S),
[L, F1, F2] = topology(Config, S),
%% the stream leader and the coordinator leader are on the same node
%% another node will be isolated
?assertEqual(L#node.name, coordinator_leader(Config)),
log("Stream leader and coordinator leader are on ~p", [L#node.name]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
{ok, So0, C0_00} = stream_test_utils:connect(Config, 0),
{ok, So1, C1_00} = stream_test_utils:connect(Config, 1),
{ok, So2, C2_00} = stream_test_utils:connect(Config, 2),
C0_01 = register_sac(So0, C0_00, S, 0),
C0_02 = receive_consumer_update(So0, C0_01),
C1_01 = register_sac(So1, C1_00, S, 1),
C2_01 = register_sac(So2, C2_00, S, 2),
SubIdToState0 = #{0 => {So0, C0_02},
1 => {So1, C1_01},
2 => {So2, C2_01}},
Consumers1 = query_consumers(Config, S),
assertSize(3, Consumers1),
assertConsumersConnected(Consumers1),
LN = L#node.name,
F1N = F1#node.name,
F2N = F2#node.name,
Isolated = F1N,
{value, DisconnectedConsumer} =
lists:search(fun(#consumer{pid = ConnPid}) ->
rpc(Config, erlang, node, [ConnPid]) =:= Isolated
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
log("Isolating node ~p", [Isolated]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N),
wait_for_disconnected_consumer(Config, LN, S),
wait_for_presumed_down_consumer(Config, LN, S),
log("Node ~p rejoins cluster", [Isolated]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N),
wait_for_all_consumers_connected(Config, LN, S),
Consumers2 = query_consumers(Config, LN, S),
log("Consumers after partition resolution: ~p", [Consumers2]),
log("Disconnected consumer: ~p", [DisconnectedConsumer]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
assertSize(2, Consumers2),
assertConsumersConnected(Consumers2),
?assertMatch([DisconnectedConsumer],
Consumers1 -- Consumers2),
%% assert the cancelled consumer received a metadata update frame
SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
log("Expecting metadata update for disconnected consumer"),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
C1 = receive_metadata_update(S0, C0),
log("Received metadata update"),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),
log("Deleting stream"),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
delete_stream(stream_port(Config, 0), S),
%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue this frame before closing the connection
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
%% directly closing the connection of the cancelled consumer
%% Edge case:
%% the waiting consumer can get 2 frames: consumer_update then metadata_update.
%% This is because the active consumer is removed from the group and this triggers
%% a rebalancing. The 2 remaining consumers are most of the time cancelled when the
%% stream is deleted, so the rebalancing does not take place.
%% We just tolerate an extra frame when closing their respective connections.
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = close_connection(S0, C1);
(K, {S0, C0}) ->
log("Closing ~p", [K]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),
ok.
simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Config) ->
init_coordinator(Config),
CL = coordinator_leader(Config),
[CF1, CF2] = all_nodes(Config) -- [CL],
S = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
init_stream(Config, CF1, S),
[L, _F1, _F2] = topology(Config, S),
%% the stream leader and the coordinator leader are not on the same node
%% the coordinator leader node will be isolated
?assertNotEqual(L#node.name, CL),
log("Coordinator leader on: ~0p~nStream leader on: ~0p", [CL, L#node.name]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
{ok, So0, C0_00} = stream_test_utils:connect(Config, CL),
{ok, So1, C1_00} = stream_test_utils:connect(Config, CF1),
{ok, So2, C2_00} = stream_test_utils:connect(Config, CF2),
C0_01 = register_sac(So0, C0_00, S, 0),
C0_02 = receive_consumer_update(So0, C0_01),
C1_01 = register_sac(So1, C1_00, S, 1),
C2_01 = register_sac(So2, C2_00, S, 2),
SubIdToState0 = #{0 => {So0, C0_02},
1 => {So1, C1_01},
2 => {So2, C2_01}},
Consumers1 = query_consumers(Config, S),
assertSize(3, Consumers1),
assertConsumersConnected(Consumers1),
%% N1 is the coordinator leader
Isolated = CL,
NotIsolated = CF1,
{value, DisconnectedConsumer} =
lists:search(fun(#consumer{pid = ConnPid}) ->
rpc(Config, erlang, node, [ConnPid]) =:= Isolated
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
log("Isolating node ~p", [Isolated]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
rabbit_ct_broker_helpers:block_traffic_between(Isolated, CF1),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, CF2),
wait_for_disconnected_consumer(Config, NotIsolated, S),
wait_for_presumed_down_consumer(Config, NotIsolated, S),
log("Node ~p rejoins cluster", [Isolated]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, CF1),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, CF2),
wait_for_coordinator_ready(Config),
wait_for_all_consumers_connected(Config, NotIsolated, S),
Consumers2 = query_consumers(Config, NotIsolated, S),
log("Consumers after partition resolution ~p", [Consumers2]),
log("Disconnected consumer: ~p", [DisconnectedConsumer]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
assertSize(2, Consumers2),
assertConsumersConnected(Consumers2),
assertEmpty(lists:filter(fun(C) ->
same_consumer(DisconnectedConsumer, C)
end, Consumers2)),
[#consumer{subscription_id = ActiveSubId}] =
lists:filter(fun(#consumer{status = St}) ->
St =:= {connected, active}
end, Consumers2),
SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
log("Expecting metadata update for disconnected consumer"),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
%% cancelled consumer received a metadata update
C1 = receive_metadata_update(S0, C0),
log("Received metadata update"),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) when K == ActiveSubId ->
log("Expecting consumer update for promoted consumer"),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
%% promoted consumer should have received consumer update
C1 = receive_consumer_update_and_respond(S0, C0),
log("Received consumer update"),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),
log("Deleting stream"),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
delete_stream(L#node.stream_port, S),
%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue this frame before closing the connection
%% directly closing the connection of the cancelled consumer
%% Edge case:
%% the waiting consumer can get 2 frames: consumer_update then metadata_update.
%% This is because the active consumer is removed from the group and this triggers
%% a rebalancing. The 2 remaining consumers are most of the time cancelled when the
%% stream is deleted, so the rebalancing does not take place.
%% We just tolerate an extra frame when closing their respective connections.
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = close_connection(S0, C1);
(K, {S0, C0}) ->
log("Closing ~p", [K]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),
ok.
super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
init_coordinator(Config),
CL = coordinator_leader(Config),
Ss = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
Partition = init_super_stream(Config, CL, Ss, 1, CL),
[L, F1, F2] = topology(Config, Partition),
wait_for_coordinator_ready(Config),
%% we expect the stream leader and the coordinator leader to be on the same node
%% another node will be isolated
?assertEqual(L#node.name, CL),
log("Stream leader and coordinator leader are on ~p", [L#node.name]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
{ok, So0, C0_00} = stream_test_utils:connect(L#node.stream_port),
{ok, So1, C1_00} = stream_test_utils:connect(F1#node.stream_port),
{ok, So2, C2_00} = stream_test_utils:connect(F2#node.stream_port),
C0_01 = register_sac(So0, C0_00, Partition, 0, Ss),
C0_02 = receive_consumer_update(So0, C0_01),
C1_01 = register_sac(So1, C1_00, Partition, 1, Ss),
C2_01 = register_sac(So2, C2_00, Partition, 2, Ss),
SubIdToState0 = #{0 => {So0, C0_02},
1 => {So1, C1_01},
2 => {So2, C2_01}},
Consumers1 = query_consumers(Config, Partition),
assertSize(3, Consumers1),
assertConsumersConnected(Consumers1),
LN = L#node.name,
F1N = F1#node.name,
F2N = F2#node.name,
Isolated = F1N,
NotIsolated = F2N,
{value, DisconnectedConsumer} =
lists:search(fun(#consumer{pid = ConnPid}) ->
rpc(Config, erlang, node, [ConnPid]) =:= Isolated
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
log("Isolating node ~p", [Isolated]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N),
wait_for_disconnected_consumer(Config, NotIsolated, Partition),
wait_for_presumed_down_consumer(Config, NotIsolated, Partition),
log("Node ~p rejoins cluster", [Isolated]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N),
wait_for_coordinator_ready(Config),
wait_for_all_consumers_connected(Config, NotIsolated, Partition),
Consumers2 = query_consumers(Config, NotIsolated, Partition),
log("Consumers after partition resolution: ~p", [Consumers2]),
log("Disconnected consumer: ~p", [DisconnectedConsumer]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
assertSize(2, Consumers2),
assertConsumersConnected(Consumers2),
assertEmpty(lists:filter(fun(C) ->
same_consumer(DisconnectedConsumer, C)
end, Consumers2)),
SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
log("Expecting metadata update for disconnected consumer"),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
%% cancelled consumer received a metadata update
C1 = receive_metadata_update(S0, C0),
log("Received metadata update"),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),
log("Deleting super stream"),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
delete_super_stream(L#node.stream_port, Ss),
%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue this frame before closing the connection
%% directly closing the connection of the cancelled consumer
%% Edge case:
%% the waiting consumer can get 2 frames: consumer_update then metadata_update.
%% This is because the active consumer is removed from the group and this triggers
%% a rebalancing. The 2 remaining consumers are most of the time cancelled when the
%% stream is deleted, so the rebalancing does not take place.
%% We just tolerate an extra frame when closing their respective connections.
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = close_connection(S0, C1);
(K, {S0, C0}) ->
log("Closing ~p", [K]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),
ok.
super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Config) ->
init_coordinator(Config),
CL = coordinator_leader(Config),
[CF1, _] = all_nodes(Config) -- [CL],
Ss = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
Partition = init_super_stream(Config, CL, Ss, 2, CF1),
[L, F1, F2] = topology(Config, Partition),
wait_for_coordinator_ready(Config),
%% check stream leader and coordinator are not on the same node
%% the coordinator leader node will be isolated
?assertNotEqual(L#node.name, CL),
log("Stream leader and coordinator leader are on ~p", [L#node.name]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
{ok, So0, C0_00} = stream_test_utils:connect(L#node.stream_port),
{ok, So1, C1_00} = stream_test_utils:connect(F1#node.stream_port),
{ok, So2, C2_00} = stream_test_utils:connect(F2#node.stream_port),
C0_01 = register_sac(So0, C0_00, Partition, 0, Ss),
C0_02 = receive_consumer_update(So0, C0_01),
C1_01 = register_sac(So1, C1_00, Partition, 1, Ss),
%% former active gets de-activated
C0_03 = receive_consumer_update_and_respond(So0, C0_02),
%% gets activated
C1_02 = receive_consumer_update_and_respond(So1, C1_01),
C2_01 = register_sac(So2, C2_00, Partition, 2, Ss),
SubIdToState0 = #{0 => {So0, C0_03},
1 => {So1, C1_02},
2 => {So2, C2_01}},
Consumers1 = query_consumers(Config, Partition),
assertSize(3, Consumers1),
assertConsumersConnected(Consumers1),
LN = L#node.name,
F1N = F1#node.name,
F2N = F2#node.name,
Isolated = F1N,
NotIsolated = F2N,
{value, DisconnectedConsumer} =
lists:search(fun(#consumer{pid = ConnPid}) ->
rpc(Config, erlang, node, [ConnPid]) =:= Isolated
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
log("Isolating node ~p", [Isolated]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N),
wait_for_disconnected_consumer(Config, NotIsolated, Partition),
wait_for_presumed_down_consumer(Config, NotIsolated, Partition),
log("Node ~p rejoins cluster", [Isolated]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N),
wait_for_coordinator_ready(Config),
wait_for_all_consumers_connected(Config, NotIsolated, Partition),
Consumers2 = query_consumers(Config, NotIsolated, Partition),
log("Consumers after partition resolution: ~p", [Consumers2]),
log("Disconnected consumer: ~p", [DisconnectedConsumer]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
assertSize(2, Consumers2),
assertConsumersConnected(Consumers2),
assertEmpty(lists:filter(fun(C) ->
same_consumer(DisconnectedConsumer, C)
end, Consumers2)),
[#consumer{subscription_id = ActiveSubId}] =
lists:filter(fun(#consumer{status = St}) ->
St =:= {connected, active}
end, Consumers2),
SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
log("Expecting metadata update for disconnected consumer"),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
%% cancelled consumer received a metadata update
C1 = receive_metadata_update(S0, C0),
log("Received metadata update"),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) when K == ActiveSubId ->
log("Expecting consumer update for promoted consumer"),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
%% promoted consumer should have received consumer update
C1 = receive_consumer_update_and_respond(S0, C0),
log("Received consumer update"),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),
log("Deleting super stream"),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
delete_super_stream(L#node.stream_port, Ss),
%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue this frame before closing the connection
%% directly closing the connection of the cancelled consumer
%% Edge case:
%% the waiting consumer can get 2 frames: consumer_update then metadata_update.
%% This is because the active consumer is removed from the group and this triggers
%% a rebalancing. The 2 remaining consumers are most of the time cancelled when the
%% stream is deleted, so the rebalancing does not take place.
%% We just tolerate an extra frame when closing their respective connections.
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = close_connection(S0, C1);
(K, {S0, C0}) ->
log("Closing ~p", [K]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),
ok.
same_consumer(#consumer{owner = P1, subscription_id = Id1},
#consumer{owner = P2, subscription_id = Id2})
when P1 == P2 andalso Id1 == Id2 ->
true;
same_consumer(_, _) ->
false.
cluster_nodes(Config) ->
lists:map(fun(N) ->
#node{name = node_config(Config, N, nodename),
stream_port = stream_port(Config, N)}
end, lists:seq(0, node_count(Config) - 1)).
node_count(Config) ->
test_server:lookup_config(rmq_nodes_count, Config).
nodename(Config, N) ->
node_config(Config, N, nodename).
stream_port(Config, N) ->
node_config(Config, N, tcp_port_stream).
node_config(Config, N, K) ->
rabbit_ct_broker_helpers:get_node_config(Config, N, K).
topology(Config, St) ->
Members = stream_members(Config, St),
LN = leader(Members),
Nodes = cluster_nodes(Config),
[L] = lists:filter(fun(#node{name = N}) ->
N =:= LN
end, Nodes),
[F1, F2] = lists:filter(fun(#node{name = N}) ->
N =/= LN
end, Nodes),
[L, F1, F2].
leader(Members) ->
maps:fold(fun(Node, {_, writer}, _Acc) ->
Node;
(_, _, Acc) ->
Acc
end, undefined, Members).
stream_members(Config, Stream) ->
{ok, Q} = rpc(Config, rabbit_amqqueue, lookup, [Stream, <<"/">>]),
#{name := StreamId} = amqqueue:get_type_state(Q),
State = rpc(Config, rabbit_stream_coordinator, state, []),
{ok, Members} = rpc(Config, rabbit_stream_coordinator, query_members,
[StreamId, State]),
Members.
init_coordinator(Config) ->
%% to make sure the coordinator is initialized
init_stream(Config, 0, <<"dummy">>),
delete_stream(stream_port(Config, 0), <<"dummy">>),
wait_for_coordinator_ready(Config).
init_stream(Config, N, St) ->
{ok, S, C0} = stream_test_utils:connect(stream_port(Config, N)),
{ok, C1} = stream_test_utils:create_stream(S, C0, St),
NC = node_count(Config),
wait_for_members(S, C1, St, NC),
{ok, _} = stream_test_utils:close(S, C1).
delete_stream(Port, St) ->
{ok, S, C0} = stream_test_utils:connect(Port),
{ok, C1} = stream_test_utils:delete_stream(S, C0, St),
{ok, _} = stream_test_utils:close(S, C1).
init_super_stream(Config, Node, Ss, PartitionIndex, ExpectedNode) ->
{ok, S, C0} = stream_test_utils:connect(Config, Node),
NC = node_count(Config),
Partitions = [unicode:characters_to_binary([Ss, <<"-">>, integer_to_binary(N)])
|| N <- lists:seq(0, NC - 1)],
Bks = [integer_to_binary(N) || N <- lists:seq(0, NC - 1)],
SsCreationFrame = request({create_super_stream, Ss, Partitions, Bks, #{}}),
ok = ?TRSPT:send(S, SsCreationFrame),
{Cmd1, C1} = receive_commands(S, C0),
?assertMatch({response, ?CORR_ID, {create_super_stream, ?RESPONSE_CODE_OK}},
Cmd1),
[wait_for_members(S, C1, P, NC) || P <- Partitions],
Partition = lists:nth(PartitionIndex, Partitions),
[#node{name = LN} | _] = topology(Config, Partition),
P = case LN of
ExpectedNode ->
Partition;
_ ->
enforce_stream_leader_on_node(Config, S, C1,
Partitions, Partition,
ExpectedNode, 10)
end,
{ok, _} = stream_test_utils:close(S, C1),
P.
enforce_stream_leader_on_node(_, _, _, _, _, _, 0) ->
ct:fail("could not create super stream partition on chosen node");
enforce_stream_leader_on_node(Config, S, C,
Partitions, Partition, Node, Count) ->
CL = coordinator_leader(Config),
NC = node_count(Config),
[begin
case P of
Partition ->
restart_stream(Config, CL, P, Node);
_ ->
restart_stream(Config, CL, P, undefined)
end,
wait_for_members(S, C, P, NC)
end || P <- Partitions],
[#node{name = LN} | _] = topology(Config, Partition),
case LN of
Node ->
Partition;
_ ->
timer:sleep(500),
enforce_stream_leader_on_node(Config, S, C,
Partitions, Partition, Node,
Count - 1)
end.
delete_super_stream(Port, Ss) ->
{ok, S, C0} = stream_test_utils:connect(Port),
SsDeletionFrame = request({delete_super_stream, Ss}),
ok = ?TRSPT:send(S, SsDeletionFrame),
{Cmd1, C1} = receive_commands(S, C0),
?assertMatch({response, ?CORR_ID, {delete_super_stream, ?RESPONSE_CODE_OK}},
Cmd1),
{ok, _} = stream_test_utils:close(S, C1).
register_sac(S, C0, St, SubId, SuperStream) ->
register_sac0(S, C0, St, SubId, #{<<"super-stream">> => SuperStream}).
register_sac(S, C0, St, SubId) ->
register_sac0(S, C0, St, SubId, #{}).
register_sac0(S, C0, St, SubId, Args) ->
SacSubscribeFrame = request({subscribe, SubId, St,
first, 1,
Args#{<<"single-active-consumer">> => <<"true">>,
<<"name">> => name()}}),
ok = ?TRSPT:send(S, SacSubscribeFrame),
{Cmd1, C1} = receive_commands(S, C0),
?assertMatch({response, ?CORR_ID, {subscribe, ?RESPONSE_CODE_OK}},
Cmd1),
C1.
receive_consumer_update(S, C0) ->
{Cmd, C1} = receive_commands(S, C0),
?assertMatch({request, _CorrId, {consumer_update, _SubId, _Status}},
Cmd),
C1.
receive_consumer_update_and_respond(S, C0) ->
{Cmd, C1} = receive_commands(S, C0),
?assertMatch({request, _CorrId, {consumer_update, _SubId, _Status}},
Cmd),
{request, CorrId, {consumer_update, _SubId, _Status}} = Cmd,
Frame = response(CorrId, {consumer_update, ?RESPONSE_CODE_OK, first}),
ok = ?TRSPT:send(S, Frame),
C1.
receive_metadata_update(S, C0) ->
{Cmd, C1} = receive_commands(S, C0),
?assertMatch({metadata_update, _, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE},
Cmd),
C1.
unsubscribe(S, C0) ->
{ok, C1} = stream_test_utils:unsubscribe(S, C0, sub_id()),
C1.
query_consumers(Config, Stream) ->
query_consumers(Config, 0, Stream).
query_consumers(Config, Node, Stream) ->
Key = group_key(Stream),
#?SAC_STATE{groups = #{Key := #group{consumers = Consumers}}} =
rpc(Config, Node, rabbit_stream_coordinator, sac_state, []),
Consumers.
all_nodes(Config) ->
lists:map(fun(N) ->
nodename(Config, N)
end, lists:seq(0, node_count(Config) - 1)).
coordinator_status(Config) ->
rpc(Config, rabbit_stream_coordinator, status, []).
coordinator_leader(Config) ->
Status = coordinator_status(Config),
case lists:search(fun(St) ->
RS = proplists:get_value(<<"Raft State">>, St,
undefined),
RS == leader
end, Status) of
{value, Leader} ->
proplists:get_value(<<"Node Name">>, Leader, undefined);
_ ->
undefined
end.
restart_stream(Config, Node, S, undefined) ->
rpc(Config, Node, rabbit_stream_queue, restart_stream, [<<"/">>, S, #{}]);
restart_stream(Config, Node, S, Leader) ->
Opts = #{preferred_leader_node => Leader},
rpc(Config, Node, rabbit_stream_queue, restart_stream, [<<"/">>, S, Opts]).
rpc(Config, M, F, A) ->
rpc(Config, 0, M, F, A).
rpc(Config, Node, M, F, A) ->
rabbit_ct_broker_helpers:rpc(Config, Node, M, F, A).
group_key(Stream) ->
{<<"/">>, Stream, name()}.
request(Cmd) ->
request(?CORR_ID, Cmd).
request(CorrId, Cmd) ->
rabbit_stream_core:frame({request, CorrId, Cmd}).
response(CorrId, Cmd) ->
rabbit_stream_core:frame({response, CorrId, Cmd}).
receive_commands(S, C) ->
receive_commands(?TRSPT, S, C).
receive_commands(Transport, S, C) ->
stream_test_utils:receive_stream_commands(Transport, S, C).
sub_id() ->
0.
name() ->
<<"app">>.
wait_for_members(S, C, St, ExpectedCount) ->
T = ?TRSPT,
GetStreamNodes =
fun() ->
MetadataFrame = request({metadata, [St]}),
ok = gen_tcp:send(S, MetadataFrame),
{CmdMetadata, _} = receive_commands(T, S, C),
{response, 1,
{metadata, _Nodes, #{St := {Leader = {_H, _P}, Replicas}}}} =
CmdMetadata,
[Leader | Replicas]
end,
rabbit_ct_helpers:await_condition(fun() ->
length(GetStreamNodes()) == ExpectedCount
end).
wait_for_disconnected_consumer(Config, Node, Stream) ->
rabbit_ct_helpers:await_condition(
fun() ->
Cs = query_consumers(Config, Node, Stream),
log("Expecting a disconnected consumer: ~p", [Cs]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
lists:any(fun(#consumer{status = {disconnected, _}}) ->
true;
(_) ->
false
end, Cs)
end).
wait_for_presumed_down_consumer(Config, Node, Stream) ->
rabbit_ct_helpers:await_condition(
fun() ->
Cs = query_consumers(Config, Node, Stream),
log("Expecting a presumed-down consumer: ~p", [Cs]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
lists:any(fun(#consumer{status = {presumed_down, _}}) ->
true;
(_) ->
false
end, Cs)
end).
wait_for_all_consumers_connected(Config, Node, Stream) ->
rabbit_ct_helpers:await_condition(
fun() ->
Cs = query_consumers(Config, Node, Stream),
log("Expecting connected consumers: ~p", [Cs]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
lists:all(fun(#consumer{status = {connected, _}}) ->
true;
(_) ->
false
end, Cs)
end, 30_000).
wait_for_coordinator_ready(Config) ->
NC = node_count(Config),
rabbit_ct_helpers:await_condition(
fun() ->
Status = coordinator_status(Config),
log("Coordinator status: ~p", [Status]),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
lists:all(fun(St) ->
RS = proplists:get_value(<<"Raft State">>, St,
undefined),
RS == leader orelse RS == follower
end, Status) andalso length(Status) == NC
end).
assertConsumersConnected(Consumers) when length(Consumers) > 0 ->
lists:foreach(fun(#consumer{status = St}) ->
?assertMatch({connected, _}, St,
"Consumer should be connected")
end, Consumers);
assertConsumersConnected(_) ->
?assert(false, "The consumer list is empty").
assertSize(Expected, []) ->
?assertEqual(Expected, 0);
assertSize(Expected, Map) when is_map(Map) ->
?assertEqual(Expected, maps:size(Map));
assertSize(Expected, List) when is_list(List) ->
?assertEqual(Expected, length(List)).
assertEmpty(Data) ->
assertSize(0, Data).
log(Format) ->
ct:pal(Format).
log(Format, Args) ->
ct:pal(Format, Args).
close_connection(Sock, C) ->
CloseReason = <<"OK">>,
CloseFrame = rabbit_stream_core:frame({request, 1, {close, ?RESPONSE_CODE_OK, CloseReason}}),
ok = gen_tcp:send(Sock, CloseFrame),
pump_until_close(Sock, C, 10).
pump_until_close(_, _, 0) ->
ct:fail("did not get close response");
pump_until_close(Sock, C0, N) ->
case stream_test_utils:receive_stream_commands(Sock, C0) of
{{response, 1, {close, ?RESPONSE_CODE_OK}}, C1} ->
{ok, C1};
{_Cmd, C1} ->
pump_until_close(Sock, C1, N - 1)
end.