rabbitmq-server/deps/rabbitmq_stream/test/commands_SUITE.erl

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

1134 lines
44 KiB
Erlang
Raw Permalink Normal View History

2021-01-19 18:31:39 +08:00
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
2024-01-23 12:44:47 +08:00
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
2021-01-19 18:31:39 +08:00
%%
-module(commands_SUITE).
2021-05-13 19:58:54 +08:00
-compile(nowarn_export_all).
2021-01-19 18:31:39 +08:00
-compile([export_all]).
2021-05-13 19:58:54 +08:00
% -include_lib("common_test/include/ct.hrl").
2021-01-19 18:31:39 +08:00
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
2021-06-11 22:58:17 +08:00
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
2021-01-19 18:31:39 +08:00
-define(WAIT, 5000).
2021-01-19 18:31:39 +08:00
-define(COMMAND_LIST_CONNECTIONS,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand').
-define(COMMAND_LIST_CONSUMERS,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand').
2021-01-19 21:49:30 +08:00
-define(COMMAND_LIST_PUBLISHERS,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand').
2021-09-24 21:18:39 +08:00
-define(COMMAND_ADD_SUPER_STREAM,
'Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand').
-define(COMMAND_DELETE_SUPER_STREAM_CLI,
2021-09-24 22:27:47 +08:00
'Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand').
-define(COMMAND_LIST_CONSUMER_GROUPS,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumerGroupsCommand').
-define(COMMAND_LIST_GROUP_CONSUMERS,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand').
2023-10-05 22:01:18 +08:00
-define(COMMAND_LIST_STREAM_TRACKING,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamTrackingCommand').
-define(COMMAND_ACTIVATE_STREAM_CONSUMER,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ActivateStreamConsumerCommand').
-define(COMMAND_RESET_OFFSET,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ResetOffsetCommand').
2021-01-19 18:31:39 +08:00
all() ->
2021-09-24 21:18:39 +08:00
[{group, list_connections},
{group, list_consumers},
{group, list_publishers},
{group, list_consumer_groups},
{group, list_group_consumers},
{group, activate_consumer},
2023-10-05 22:01:18 +08:00
{group, list_stream_tracking},
{group, reset_offset},
2021-09-25 00:05:30 +08:00
{group, super_streams}].
2021-01-19 18:31:39 +08:00
groups() ->
[{list_connections, [],
[list_connections_merge_defaults, list_connections_run,
list_tls_connections_run]},
2021-01-19 18:31:39 +08:00
{list_consumers, [],
2021-01-19 21:49:30 +08:00
[list_consumers_merge_defaults, list_consumers_run]},
{list_publishers, [],
2021-09-24 21:18:39 +08:00
[list_publishers_merge_defaults, list_publishers_run]},
{list_consumer_groups, [],
[list_consumer_groups_validate, list_consumer_groups_merge_defaults,
list_consumer_groups_run]},
{list_group_consumers, [],
[list_group_consumers_validate, list_group_consumers_merge_defaults,
list_group_consumers_run]},
{activate_consumer, [],
[activate_consumer_validate, activate_consumer_merge_defaults,
activate_consumer_run]},
2023-10-05 22:01:18 +08:00
{list_stream_tracking, [],
[list_stream_tracking_validate, list_stream_tracking_merge_defaults,
list_stream_tracking_run]},
{reset_offset, [],
[reset_offset_validate, reset_offset_merge_defaults,
reset_offset_run]},
2021-09-24 21:18:39 +08:00
{super_streams, [],
2021-09-24 22:27:47 +08:00
[add_super_stream_merge_defaults,
add_super_stream_validate,
delete_super_stream_merge_defaults,
delete_super_stream_validate,
add_delete_super_stream_run]}].
2021-01-19 18:31:39 +08:00
init_per_suite(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip,
"mixed version clusters are not supported for "
"this suite"};
_ ->
Config1 =
rabbit_ct_helpers:set_config(Config,
[{rmq_nodename_suffix, ?MODULE}]),
Config2 =
rabbit_ct_helpers:set_config(Config1,
2021-09-09 16:37:41 +08:00
{rabbitmq_ct_tls_verify,
verify_none}),
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config2,
rabbit_ct_broker_helpers:setup_steps())
end.
2021-01-19 18:31:39 +08:00
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).
init_per_group(_, Config) ->
Config.
end_per_group(_, Config) ->
Config.
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
list_connections_merge_defaults(_Config) ->
{[<<"node">>, <<"conn_name">>], #{verbose := false}} =
2021-01-19 18:31:39 +08:00
?COMMAND_LIST_CONNECTIONS:merge_defaults([], #{}),
{[<<"other_key">>], #{verbose := true}} =
?COMMAND_LIST_CONNECTIONS:merge_defaults([<<"other_key">>],
#{verbose => true}),
{[<<"other_key">>], #{verbose := false}} =
?COMMAND_LIST_CONNECTIONS:merge_defaults([<<"other_key">>],
#{verbose => false}).
list_connections_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
verbose => false},
%% No connections
[] = to_list(?COMMAND_LIST_CONNECTIONS:run([], Opts)),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
2021-05-13 19:58:54 +08:00
{S1, C1} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
2021-01-19 18:31:39 +08:00
[[{conn_name, _}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
[[{ssl, false}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"ssl">>], Opts)),
2021-01-19 18:31:39 +08:00
2021-05-13 19:58:54 +08:00
{S2, C2} = start_stream_connection(StreamPort),
?awaitMatch(2, connection_count(Config), ?WAIT),
2021-01-19 18:31:39 +08:00
[[{conn_name, _}], [{conn_name, _}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
Port =
rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
start_amqp_connection(network, Node, Port),
%% There are still just two connections
[[{conn_name, _}], [{conn_name, _}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
start_amqp_connection(direct, Node, Port),
%% Still two stream connections, one direct AMQP 0-9-1 connection
[[{conn_name, _}], [{conn_name, _}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
%% Verbose returns all keys
Infos =
lists:map(fun(El) -> atom_to_binary(El, utf8) end, ?INFO_ITEMS),
AllKeys = to_list(?COMMAND_LIST_CONNECTIONS:run(Infos, Opts)),
Verbose =
to_list(?COMMAND_LIST_CONNECTIONS:run([], Opts#{verbose => true})),
?assertEqual(AllKeys, Verbose),
%% There are two connections
[First, _Second] = AllKeys,
%% Keys are INFO_ITEMS
?assertEqual(length(?INFO_ITEMS), length(First)),
{Keys, _} = lists:unzip(First),
?assertEqual([], Keys -- ?INFO_ITEMS),
?assertEqual([], ?INFO_ITEMS -- Keys),
rabbit_stream_SUITE:test_close(gen_tcp, S1, C1),
rabbit_stream_SUITE:test_close(gen_tcp, S2, C2),
2021-01-19 18:31:39 +08:00
ok.
list_tls_connections_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
verbose => false},
%% No connections
[] = to_list(?COMMAND_LIST_CONNECTIONS:run([], Opts)),
StreamTlsPort = rabbit_stream_SUITE:get_stream_port_tls(Config),
application:ensure_all_started(ssl),
{S1, C1} = start_stream_tls_connection(StreamTlsPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
[[{conn_name, _}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
[[{ssl, true}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"ssl">>], Opts)),
rabbit_stream_SUITE:test_close(ssl, S1, C1),
ok.
2021-01-19 18:31:39 +08:00
list_consumers_merge_defaults(_Config) ->
DefaultItems =
[rabbit_data_coercion:to_binary(Item)
|| Item <- ?CONSUMER_INFO_ITEMS -- [connection_pid, node]],
2021-01-19 18:31:39 +08:00
{DefaultItems, #{verbose := false}} =
?COMMAND_LIST_CONSUMERS:merge_defaults([], #{}),
{[<<"other_key">>], #{verbose := true}} =
?COMMAND_LIST_CONSUMERS:merge_defaults([<<"other_key">>],
#{verbose => true}),
{[<<"other_key">>], #{verbose := false}} =
?COMMAND_LIST_CONSUMERS:merge_defaults([<<"other_key">>],
#{verbose => false}).
list_consumers_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
verbose => false,
vhost => <<"/">>},
%% No connections, no consumers
[] = to_list(?COMMAND_LIST_CONSUMERS:run([], Opts)),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
2021-05-13 19:58:54 +08:00
{S1, C1} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
2021-01-19 18:31:39 +08:00
Stream = <<"list_consumers_run">>,
2021-05-13 19:58:54 +08:00
C1_1 = create_stream(S1, Stream, C1),
2021-01-19 18:31:39 +08:00
SubId = 42,
2021-05-13 19:58:54 +08:00
C1_2 = subscribe(S1, SubId, Stream, C1_1),
2021-01-19 18:31:39 +08:00
?awaitMatch(1, consumer_count(Config), ?WAIT),
2021-01-19 18:31:39 +08:00
2021-05-13 19:58:54 +08:00
{S2, C2} = start_stream_connection(StreamPort),
?awaitMatch(2, connection_count(Config), ?WAIT),
2021-05-13 19:58:54 +08:00
C2_1 = subscribe(S2, SubId, Stream, C2),
2021-01-19 18:31:39 +08:00
?awaitMatch(2, consumer_count(Config), ?WAIT),
2021-01-19 18:31:39 +08:00
%% Verbose returns all keys
InfoItems = ?CONSUMER_INFO_ITEMS,
Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, InfoItems),
AllKeys = to_list(?COMMAND_LIST_CONSUMERS:run(Infos, Opts)),
Verbose =
to_list(?COMMAND_LIST_CONSUMERS:run([], Opts#{verbose => true})),
?assertEqual(AllKeys, Verbose),
%% There are two consumers
[First, _Second] = AllKeys,
2021-01-19 18:31:39 +08:00
%% Keys are info items
?assertEqual(length(InfoItems), length(First)),
{Keys, _} = lists:unzip(First),
?assertEqual([], Keys -- InfoItems),
?assertEqual([], InfoItems -- Keys),
2021-05-13 19:58:54 +08:00
C1_3 = delete_stream(S1, Stream, C1_2),
% metadata_update_stream_deleted(S1, Stream),
metadata_update_stream_deleted(S2, Stream, C2_1),
close(S1, C1_3),
close(S2, C2_1),
?awaitMatch(0, consumer_count(Config), ?WAIT),
2021-01-19 18:31:39 +08:00
ok.
2021-01-19 21:49:30 +08:00
list_publishers_merge_defaults(_Config) ->
DefaultItems =
[rabbit_data_coercion:to_binary(Item)
|| Item <- ?PUBLISHER_INFO_ITEMS -- [connection_pid, node]],
2021-01-19 21:49:30 +08:00
{DefaultItems, #{verbose := false}} =
?COMMAND_LIST_PUBLISHERS:merge_defaults([], #{}),
{[<<"other_key">>], #{verbose := true}} =
?COMMAND_LIST_PUBLISHERS:merge_defaults([<<"other_key">>],
#{verbose => true}),
{[<<"other_key">>], #{verbose := false}} =
?COMMAND_LIST_PUBLISHERS:merge_defaults([<<"other_key">>],
#{verbose => false}).
list_publishers_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
verbose => false,
vhost => <<"/">>},
%% No connections, no publishers
[] = to_list(?COMMAND_LIST_PUBLISHERS:run([], Opts)),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
2021-05-13 19:58:54 +08:00
{S1, C1} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
2021-01-19 21:49:30 +08:00
Stream = <<"list_publishers_run">>,
2021-05-13 19:58:54 +08:00
C1_1 = create_stream(S1, Stream, C1),
2021-01-19 21:49:30 +08:00
PubId = 42,
2021-05-13 19:58:54 +08:00
C1_2 = declare_publisher(S1, PubId, Stream, C1_1),
2021-01-19 21:49:30 +08:00
?awaitMatch(1, publisher_count(Config), ?WAIT),
2021-01-19 21:49:30 +08:00
2021-05-13 19:58:54 +08:00
{S2, C2} = start_stream_connection(StreamPort),
?awaitMatch(2, connection_count(Config), ?WAIT),
2021-05-13 19:58:54 +08:00
C2_1 = declare_publisher(S2, PubId, Stream, C2),
2021-01-19 21:49:30 +08:00
?awaitMatch(2, publisher_count(Config), ?WAIT),
2021-01-19 21:49:30 +08:00
%% Verbose returns all keys
InfoItems = ?PUBLISHER_INFO_ITEMS,
Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, InfoItems),
AllKeys = to_list(?COMMAND_LIST_PUBLISHERS:run(Infos, Opts)),
Verbose =
to_list(?COMMAND_LIST_PUBLISHERS:run([], Opts#{verbose => true})),
?assertEqual(AllKeys, Verbose),
%% There are two publishers
[First, _Second] = AllKeys,
2021-01-19 21:49:30 +08:00
%% Keys are info items
?assertEqual(length(InfoItems), length(First)),
{Keys, _} = lists:unzip(First),
?assertEqual([], Keys -- InfoItems),
?assertEqual([], InfoItems -- Keys),
2021-05-13 19:58:54 +08:00
C1_3 = delete_stream(S1, Stream, C1_2),
% metadata_update_stream_deleted(S1, Stream),
C2_2 = metadata_update_stream_deleted(S2, Stream, C2_1),
close(S1, C1_3),
close(S2, C2_2),
?awaitMatch(0, publisher_count(Config), ?WAIT),
2021-01-19 21:49:30 +08:00
ok.
list_consumer_groups_validate(_) ->
ValidOpts = #{vhost => <<"/">>},
?assertMatch({validation_failure, {bad_info_key, [foo]}},
?COMMAND_LIST_CONSUMER_GROUPS:validate([<<"foo">>],
ValidOpts)),
?assertMatch(ok,
?COMMAND_LIST_CONSUMER_GROUPS:validate([<<"reference">>],
ValidOpts)),
?assertMatch(ok,
?COMMAND_LIST_CONSUMER_GROUPS:validate([], ValidOpts)).
list_consumer_groups_merge_defaults(_Config) ->
DefaultItems =
[rabbit_data_coercion:to_binary(Item)
|| Item <- ?CONSUMER_GROUP_INFO_ITEMS],
{DefaultItems, #{verbose := false, vhost := <<"/">>}} =
?COMMAND_LIST_CONSUMER_GROUPS:merge_defaults([], #{}),
{[<<"other_key">>], #{verbose := true, vhost := <<"/">>}} =
?COMMAND_LIST_CONSUMER_GROUPS:merge_defaults([<<"other_key">>],
#{verbose => true}),
{[<<"other_key">>], #{verbose := false, vhost := <<"/">>}} =
?COMMAND_LIST_CONSUMER_GROUPS:merge_defaults([<<"other_key">>],
#{verbose => false}).
list_consumer_groups_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
vhost => <<"/">>,
verbose => true},
%% No connections, no consumers
{ok, []} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
{S, C0} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
ConsumerReference = <<"foo">>,
SubProperties =
#{<<"single-active-consumer">> => <<"true">>,
<<"name">> => ConsumerReference},
Stream1 = <<"list_consumer_groups_run_1">>,
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
C1 = create_stream(S, Stream1, C0),
C2 = subscribe(S, 0, Stream1, SubProperties, C1),
C3 = handle_consumer_update(S, C2, 0),
C4 = subscribe(S, 1, Stream1, SubProperties, C3),
C5 = subscribe(S, 2, Stream1, SubProperties, C4),
?awaitMatch(3, consumer_count(Config), ?WAIT),
{ok, [CG1]} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
assertConsumerGroup(Stream1, ConsumerReference, -1, 3, CG1),
Stream2 = <<"list_consumer_groups_run_2">>,
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
C6 = create_stream(S, Stream2, C5),
C7 = subscribe(S, 3, Stream2, SubProperties, C6),
C8 = handle_consumer_update(S, C7, 3),
C9 = subscribe(S, 4, Stream2, SubProperties, C8),
C10 = subscribe(S, 5, Stream2, SubProperties, C9),
?awaitMatch(3 + 3, consumer_count(Config), ?WAIT),
{ok, [CG1, CG2]} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
assertConsumerGroup(Stream1, ConsumerReference, -1, 3, CG1),
assertConsumerGroup(Stream2, ConsumerReference, -1, 3, CG2),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
C11 = delete_stream(S, Stream1, C10),
C12 = delete_stream(S, Stream2, C11),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
close(S, C12),
{ok, []} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
ok.
list_group_consumers_validate(_) ->
ValidOpts =
#{vhost => <<"/">>,
stream => <<"s1">>,
reference => <<"foo">>},
?assertMatch({validation_failure, not_enough_args},
?COMMAND_LIST_GROUP_CONSUMERS:validate([], #{})),
?assertMatch({validation_failure, not_enough_args},
?COMMAND_LIST_GROUP_CONSUMERS:validate([],
#{vhost =>
<<"test">>})),
?assertMatch({validation_failure, {bad_info_key, [foo]}},
?COMMAND_LIST_GROUP_CONSUMERS:validate([<<"foo">>],
ValidOpts)),
?assertMatch(ok,
?COMMAND_LIST_GROUP_CONSUMERS:validate([<<"subscription_id">>],
ValidOpts)),
?assertMatch(ok,
?COMMAND_LIST_GROUP_CONSUMERS:validate([], ValidOpts)).
list_group_consumers_merge_defaults(_Config) ->
DefaultItems =
[rabbit_data_coercion:to_binary(Item)
|| Item <- ?GROUP_CONSUMER_INFO_ITEMS],
{DefaultItems, #{verbose := false, vhost := <<"/">>}} =
?COMMAND_LIST_GROUP_CONSUMERS:merge_defaults([], #{}),
{[<<"other_key">>], #{verbose := true, vhost := <<"/">>}} =
?COMMAND_LIST_GROUP_CONSUMERS:merge_defaults([<<"other_key">>],
#{verbose => true}),
{[<<"other_key">>], #{verbose := false, vhost := <<"/">>}} =
?COMMAND_LIST_GROUP_CONSUMERS:merge_defaults([<<"other_key">>],
#{verbose => false}).
list_group_consumers_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
vhost => <<"/">>,
verbose => false},
Args = [<<"subscription_id">>, <<"state">>],
Stream1 = <<"list_group_consumers_run_1">>,
ConsumerReference = <<"foo">>,
OptsGroup1 =
maps:merge(#{stream => Stream1, reference => ConsumerReference},
Opts),
%% the group does not exist yet
{error, not_found} =
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup1),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
{S, C} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
SubProperties =
#{<<"single-active-consumer">> => <<"true">>,
<<"name">> => ConsumerReference},
create_stream(S, Stream1, C),
subscribe(S, 0, Stream1, SubProperties, C),
handle_consumer_update(S, C, 0),
subscribe(S, 1, Stream1, SubProperties, C),
subscribe(S, 2, Stream1, SubProperties, C),
?awaitMatch(3, consumer_count(Config), ?WAIT),
{ok, Consumers1} =
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup1),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
?assertEqual([[{subscription_id, 0}, {state, "active (connected)"}],
[{subscription_id, 1}, {state, "waiting (connected)"}],
[{subscription_id, 2}, {state, "waiting (connected)"}]],
Consumers1),
Stream2 = <<"list_group_consumers_run_2">>,
OptsGroup2 =
maps:merge(#{stream => Stream2, reference => ConsumerReference},
Opts),
create_stream(S, Stream2, C),
subscribe(S, 3, Stream2, SubProperties, C),
handle_consumer_update(S, C, 3),
subscribe(S, 4, Stream2, SubProperties, C),
subscribe(S, 5, Stream2, SubProperties, C),
?awaitMatch(3 + 3, consumer_count(Config), ?WAIT),
{ok, Consumers2} =
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup2),
Prevent blocked groups in stream SAC with fine-grained status A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation detail: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster. Fixes #14070
2025-06-10 18:01:18 +08:00
?assertEqual([[{subscription_id, 3}, {state, "active (connected)"}],
[{subscription_id, 4}, {state, "waiting (connected)"}],
[{subscription_id, 5}, {state, "waiting (connected)"}]],
Consumers2),
delete_stream(S, Stream1, C),
delete_stream(S, Stream2, C),
{error, not_found} =
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup2),
close(S, C),
ok.
activate_consumer_validate(_) ->
Cmd = ?COMMAND_ACTIVATE_STREAM_CONSUMER,
ValidOpts = #{vhost => <<"/">>,
stream => <<"s1">>,
reference => <<"foo">>},
?assertMatch({validation_failure, not_enough_args},
Cmd:validate([], #{})),
?assertMatch({validation_failure, not_enough_args},
Cmd:validate([], #{vhost => <<"test">>})),
?assertMatch({validation_failure, too_many_args},
Cmd:validate([<<"foo">>], ValidOpts)),
?assertMatch(ok, Cmd:validate([], ValidOpts)).
activate_consumer_merge_defaults(_Config) ->
Cmd = ?COMMAND_ACTIVATE_STREAM_CONSUMER,
Opts = #{vhost => <<"/">>,
stream => <<"s1">>,
reference => <<"foo">>},
?assertEqual({[], Opts},
Cmd:merge_defaults([], maps:without([vhost], Opts))),
Merged = maps:merge(Opts, #{vhost => "vhost"}),
?assertEqual({[], Merged},
Cmd:merge_defaults([], Merged)).
activate_consumer_run(Config) ->
Cmd = ?COMMAND_ACTIVATE_STREAM_CONSUMER,
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =#{node => Node,
timeout => 10000,
vhost => <<"/">>},
Args = [],
St = atom_to_binary(?FUNCTION_NAME, utf8),
ConsumerReference = <<"foo">>,
OptsGroup = maps:merge(#{stream => St, reference => ConsumerReference},
Opts),
%% the group does not exist yet
?assertEqual({error, not_found}, Cmd:run(Args, OptsGroup)),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
{S, C} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
SubProperties =#{<<"single-active-consumer">> => <<"true">>,
<<"name">> => ConsumerReference},
create_stream(S, St, C),
subscribe(S, 0, St, SubProperties, C),
handle_consumer_update(S, C, 0),
subscribe(S, 1, St, SubProperties, C),
subscribe(S, 2, St, SubProperties, C),
?awaitMatch(3, consumer_count(Config), ?WAIT),
?assertEqual(ok, Cmd:run(Args, OptsGroup)),
delete_stream(S, St, C),
close(S, C),
ok.
handle_consumer_update(S, C0, SubId) ->
{{request, CorrId, {consumer_update, SubId, true}}, C1} =
rabbit_stream_SUITE:receive_commands(gen_tcp, S, C0),
ConsumerUpdateCmd =
{response, CorrId, {consumer_update, ?RESPONSE_CODE_OK, next}},
ConsumerUpdateFrame = rabbit_stream_core:frame(ConsumerUpdateCmd),
ok = gen_tcp:send(S, ConsumerUpdateFrame),
C1.
assertConsumerGroup(S, R, PI, Cs, Record) ->
?assertEqual(S, proplists:get_value(stream, Record)),
?assertEqual(R, proplists:get_value(reference, Record)),
?assertEqual(PI, proplists:get_value(partition_index, Record)),
?assertEqual(Cs, proplists:get_value(consumers, Record)),
ok.
2023-10-05 22:01:18 +08:00
list_stream_tracking_validate(_) ->
ValidOpts = #{vhost => <<"/">>, <<"writer">> => true},
?assertMatch({validation_failure, not_enough_args},
?COMMAND_LIST_STREAM_TRACKING:validate([], #{})),
?assertMatch({validation_failure, not_enough_args},
?COMMAND_LIST_STREAM_TRACKING:validate([],
#{vhost =>
<<"test">>})),
?assertMatch({validation_failure, "Specify only one of --all, --offset, --writer."},
?COMMAND_LIST_STREAM_TRACKING:validate([<<"stream">>],
#{all => true, writer => true})),
?assertMatch({validation_failure, too_many_args},
?COMMAND_LIST_STREAM_TRACKING:validate([<<"stream">>, <<"bad">>],
ValidOpts)),
?assertMatch(ok,
?COMMAND_LIST_STREAM_TRACKING:validate([<<"stream">>],
ValidOpts)).
list_stream_tracking_merge_defaults(_Config) ->
?assertMatch({[<<"s">>], #{all := true, vhost := <<"/">>}},
?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{})),
?assertMatch({[<<"s">>], #{writer := true, vhost := <<"/">>}},
?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{writer => true})),
?assertMatch({[<<"s">>], #{all := true, vhost := <<"dev">>}},
?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{vhost => <<"dev">>})),
?assertMatch({[<<"s">>], #{writer := true, vhost := <<"dev">>}},
?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{writer => true, vhost => <<"dev">>})).
list_stream_tracking_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Stream = <<"list_stream_tracking_run">>,
ConsumerReference = <<"foo">>,
PublisherReference = <<"bar">>,
Opts =
#{node => Node,
timeout => 10000,
vhost => <<"/">>},
Args = [Stream],
%% the stream does not exist yet
?assertMatch({error, "The stream does not exist."},
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
{S, C} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
create_stream(S, Stream, C),
?assertMatch([],
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})),
store_offset(S, Stream, ConsumerReference, 42, C),
2023-10-05 22:49:40 +08:00
?assertMatch([[{type,offset}, {name, ConsumerReference}, {tracking_value, 42}]],
2023-10-05 22:01:18 +08:00
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})),
2023-10-05 22:49:40 +08:00
?assertMatch([[{type,offset}, {name, ConsumerReference}, {tracking_value, 42}]],
2023-10-05 22:01:18 +08:00
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{offset => true})),
ok = store_offset(S, Stream, ConsumerReference, 55, C),
2023-10-05 22:49:40 +08:00
?assertMatch([[{type,offset}, {name, ConsumerReference}, {tracking_value, 55}]],
2023-10-05 22:01:18 +08:00
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{offset => true})),
PublisherId = 1,
rabbit_stream_SUITE:test_declare_publisher(gen_tcp, S, PublisherId,
PublisherReference, Stream, C),
rabbit_stream_SUITE:test_publish_confirm(gen_tcp, S, PublisherId, 42, <<"">>, C),
ok = check_publisher_sequence(S, Stream, PublisherReference, 42, C),
?assertMatch([
2023-10-05 22:49:40 +08:00
[{type,writer},{name,<<"bar">>},{tracking_value, 42}],
[{type,offset},{name,<<"foo">>},{tracking_value, 55}]
2023-10-05 22:01:18 +08:00
],
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})),
?assertMatch([
2023-10-05 22:49:40 +08:00
[{type,writer},{name,<<"bar">>},{tracking_value, 42}]
2023-10-05 22:01:18 +08:00
],
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{writer => true})),
rabbit_stream_SUITE:test_publish_confirm(gen_tcp, S, PublisherId, 66, <<"">>, C),
ok = check_publisher_sequence(S, Stream, PublisherReference, 66, C),
?assertMatch([
2023-10-05 22:49:40 +08:00
[{type,writer},{name,<<"bar">>},{tracking_value, 66}]
2023-10-05 22:01:18 +08:00
],
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{writer => true})),
delete_stream(S, Stream, C),
close(S, C),
ok.
reset_offset_validate(_) ->
Cmd = ?COMMAND_RESET_OFFSET,
ValidOpts = #{vhost => <<"/">>,
stream => <<"s1">>,
reference => <<"foo">>},
?assertMatch({validation_failure, not_enough_args},
Cmd:validate([], #{})),
?assertMatch({validation_failure, not_enough_args},
Cmd:validate([], #{vhost => <<"test">>})),
?assertMatch({validation_failure, too_many_args},
Cmd:validate([<<"foo">>], ValidOpts)),
?assertMatch({validation_failure, reference_too_long},
Cmd:validate([], ValidOpts#{reference => gen_bin(256)})),
?assertMatch(ok, Cmd:validate([], ValidOpts)),
?assertMatch(ok, Cmd:validate([], ValidOpts#{reference => gen_bin(255)})).
reset_offset_merge_defaults(_Config) ->
Cmd = ?COMMAND_RESET_OFFSET,
Opts = #{vhost => <<"/">>,
stream => <<"s1">>,
reference => <<"foo">>},
?assertEqual({[], Opts},
Cmd:merge_defaults([], maps:without([vhost], Opts))),
Merged = maps:merge(Opts, #{vhost => "vhost"}),
?assertEqual({[], Merged},
Cmd:merge_defaults([], Merged)).
reset_offset_run(Config) ->
Cmd = ?COMMAND_RESET_OFFSET,
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =#{node => Node,
timeout => 10000,
vhost => <<"/">>},
Args = [],
St = atom_to_binary(?FUNCTION_NAME, utf8),
Ref = <<"foo">>,
OptsGroup = maps:merge(#{stream => St, reference => Ref},
Opts),
%% the stream does not exist yet
?assertMatch({error, not_found},
Cmd:run(Args, OptsGroup)),
Port = rabbit_stream_SUITE:get_stream_port(Config),
{S, C} = start_stream_connection(Port),
create_stream(S, St, C),
?assertEqual({error, no_reference}, Cmd:run(Args, OptsGroup)),
store_offset(S, St, Ref, 42, C),
check_stored_offset(S, St, Ref, 42, C),
?assertMatch(ok, Cmd:run(Args, OptsGroup)),
check_stored_offset(S, St, Ref, 0, C),
delete_stream(S, St, C),
close(S, C),
ok.
2021-09-24 21:18:39 +08:00
add_super_stream_merge_defaults(_Config) ->
?assertMatch({[<<"super-stream">>],
#{partitions := 3, vhost := <<"/">>}},
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
#{})),
?assertMatch({[<<"super-stream">>],
#{partitions := 5, vhost := <<"/">>}},
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
#{partitions => 5})),
DefaultWithBindingKeys =
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
#{binding_keys =>
<<"amer,emea,apac">>}),
?assertMatch({[<<"super-stream">>],
#{binding_keys := <<"amer,emea,apac">>, vhost := <<"/">>}},
DefaultWithBindingKeys),
{_, OptsBks} = DefaultWithBindingKeys,
?assertEqual(false, maps:is_key(partitions, OptsBks)),
2021-09-24 21:18:39 +08:00
DefaultWithRoutingKeys =
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
#{routing_keys =>
<<"amer,emea,apac">>}),
?assertMatch({[<<"super-stream">>],
#{binding_keys := <<"amer,emea,apac">>, vhost := <<"/">>}},
2021-09-24 21:18:39 +08:00
DefaultWithRoutingKeys),
{_, OptsRks} = DefaultWithRoutingKeys,
?assertEqual(false, maps:is_key(partitions, OptsRks)).
2021-09-24 21:18:39 +08:00
add_super_stream_validate(_Config) ->
?assertMatch({validation_failure, not_enough_args},
?COMMAND_ADD_SUPER_STREAM:validate([], #{})),
?assertMatch({validation_failure, too_many_args},
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>, <<"b">>], #{})),
?assertMatch({validation_failure, _},
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{partitions => 1,
routing_keys =>
<<"a,b,c">>})),
?assertMatch({validation_failure, _},
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{partitions => 1,
binding_keys => <<"a,b,c">>})),
?assertMatch({validation_failure, _},
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{routing_keys => 1,
binding_keys => <<"a,b,c">>}
)),
2021-09-24 21:18:39 +08:00
?assertMatch({validation_failure, _},
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{partitions => 0})),
?assertEqual(ok,
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{partitions => 5})),
?assertEqual(ok,
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{routing_keys =>
<<"a,b,c">>})),
?assertEqual(ok,
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{binding_keys =>
<<"a,b,c">>})),
[case Expected of
ok ->
?assertEqual(ok,
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], Opts));
error ->
?assertMatch({validation_failure, _},
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], Opts))
end
|| {Opts, Expected}
<- [{#{max_length_bytes => 1000}, ok},
{#{max_length_bytes => <<"1000">>}, ok},
{#{max_length_bytes => <<"100gb">>}, ok},
{#{max_length_bytes => <<"50mb">>}, ok},
{#{max_length_bytes => <<"50bm">>}, error},
{#{max_age => <<"PT10M">>}, ok},
{#{max_age => <<"P5DT8H">>}, ok},
{#{max_age => <<"foo">>}, error},
{#{stream_max_segment_size_bytes => 1000}, ok},
{#{stream_max_segment_size_bytes => <<"1000">>}, ok},
{#{stream_max_segment_size_bytes => <<"100gb">>}, ok},
{#{stream_max_segment_size_bytes => <<"50mb">>}, ok},
{#{stream_max_segment_size_bytes => <<"50bm">>}, error},
{#{leader_locator => <<"client-local">>}, ok},
{#{leader_locator => <<"least-leaders">>}, ok},
{#{leader_locator => <<"random">>}, ok},
{#{leader_locator => <<"foo">>}, error},
{#{initial_cluster_size => <<"1">>}, ok},
{#{initial_cluster_size => <<"2">>}, ok},
{#{initial_cluster_size => <<"3">>}, ok},
{#{initial_cluster_size => <<"0">>}, error},
{#{initial_cluster_size => <<"-1">>}, error},
{#{initial_cluster_size => <<"foo">>}, error}]],
2021-09-24 21:18:39 +08:00
ok.
2021-09-24 22:27:47 +08:00
delete_super_stream_merge_defaults(_Config) ->
?assertMatch({[<<"super-stream">>], #{vhost := <<"/">>}},
?COMMAND_DELETE_SUPER_STREAM_CLI:merge_defaults([<<"super-stream">>],
2021-09-24 22:27:47 +08:00
#{})),
ok.
delete_super_stream_validate(_Config) ->
?assertMatch({validation_failure, not_enough_args},
?COMMAND_DELETE_SUPER_STREAM_CLI:validate([], #{})),
2021-09-24 22:27:47 +08:00
?assertMatch({validation_failure, too_many_args},
?COMMAND_DELETE_SUPER_STREAM_CLI:validate([<<"a">>, <<"b">>],
2021-09-24 22:27:47 +08:00
#{})),
?assertEqual(ok, ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], #{})),
ok.
add_delete_super_stream_run(Config) ->
2021-09-24 21:18:39 +08:00
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
2021-09-24 22:27:47 +08:00
vhost => <<"/">>},
2021-09-24 21:18:39 +08:00
% with number of partitions
2021-09-24 21:18:39 +08:00
?assertMatch({ok, _},
2021-09-24 22:27:47 +08:00
?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
maps:merge(#{partitions => 3},
Opts))),
2021-09-24 21:18:39 +08:00
?assertEqual({ok,
[<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]},
2021-09-24 22:27:47 +08:00
partitions(Config, <<"invoices">>)),
?assertMatch({ok, _},
?COMMAND_DELETE_SUPER_STREAM_CLI:run([<<"invoices">>], Opts)),
2021-09-24 22:27:47 +08:00
?assertEqual({error, stream_not_found},
partitions(Config, <<"invoices">>)),
% with binding keys
2021-09-24 22:27:47 +08:00
?assertMatch({ok, _},
?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
maps:merge(#{binding_keys => <<" amer,emea , apac">>},
2021-09-24 22:27:47 +08:00
Opts))),
?assertEqual({ok,
[<<"invoices-amer">>, <<"invoices-emea">>,
<<"invoices-apac">>]},
partitions(Config, <<"invoices">>)),
?assertMatch({ok, _},
?COMMAND_DELETE_SUPER_STREAM_CLI:run([<<"invoices">>], Opts)),
2021-09-24 22:27:47 +08:00
?assertEqual({error, stream_not_found},
partitions(Config, <<"invoices">>)),
% with arguments
ExtraOptions =
#{partitions => 3,
max_length_bytes => <<"50mb">>,
max_age => <<"PT10M">>,
stream_max_segment_size_bytes => <<"1mb">>,
leader_locator => <<"random">>,
initial_cluster_size => <<"1">>},
?assertMatch({ok, _},
?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
maps:merge(ExtraOptions, Opts))),
{ok, Q} = queue_lookup(Config, <<"invoices-0">>),
Args = amqqueue:get_arguments(Q),
?assertMatch({_, <<"random">>},
rabbit_misc:table_lookup(Args, <<"x-queue-leader-locator">>)),
?assertMatch({_, 1},
rabbit_misc:table_lookup(Args, <<"x-initial-cluster-size">>)),
?assertMatch({_, 1000000},
rabbit_misc:table_lookup(Args,
<<"x-stream-max-segment-size-bytes">>)),
?assertMatch({_, <<"600s">>},
rabbit_misc:table_lookup(Args, <<"x-max-age">>)),
?assertMatch({_, 50000000},
rabbit_misc:table_lookup(Args, <<"x-max-length-bytes">>)),
?assertMatch({_, <<"stream">>},
rabbit_misc:table_lookup(Args, <<"x-queue-type">>)),
?assertMatch({ok, _},
?COMMAND_DELETE_SUPER_STREAM_CLI:run([<<"invoices">>], Opts)),
2021-09-24 22:27:47 +08:00
ok.
2021-09-25 00:18:55 +08:00
partitions(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
partitions,
[<<"/">>, Name]).
2021-09-24 21:18:39 +08:00
2021-05-13 19:58:54 +08:00
create_stream(S, Stream, C0) ->
rabbit_stream_SUITE:test_create_stream(gen_tcp, S, Stream, C0).
2021-01-19 18:31:39 +08:00
subscribe(S, SubId, Stream, SubProperties, C) ->
rabbit_stream_SUITE:test_subscribe(gen_tcp,
S,
SubId,
Stream,
SubProperties,
2023-06-16 22:39:21 +08:00
?RESPONSE_CODE_OK,
C).
2021-05-13 19:58:54 +08:00
subscribe(S, SubId, Stream, C) ->
rabbit_stream_SUITE:test_subscribe(gen_tcp, S, SubId, Stream, C).
2021-01-19 18:31:39 +08:00
2021-05-13 19:58:54 +08:00
declare_publisher(S, PubId, Stream, C) ->
2021-05-26 17:08:43 +08:00
rabbit_stream_SUITE:test_declare_publisher(gen_tcp,
S,
PubId,
Stream,
C).
2021-01-19 21:49:30 +08:00
2021-05-13 19:58:54 +08:00
delete_stream(S, Stream, C) ->
rabbit_stream_SUITE:test_delete_stream(gen_tcp, S, Stream, C).
2021-01-19 18:31:39 +08:00
2023-10-05 22:01:18 +08:00
delete_stream_no_metadata_update(S, Stream, C) ->
rabbit_stream_SUITE:test_delete_stream(gen_tcp, S, Stream, C, false).
2021-05-13 19:58:54 +08:00
metadata_update_stream_deleted(S, Stream, C) ->
2021-05-26 17:08:43 +08:00
rabbit_stream_SUITE:test_metadata_update_stream_deleted(gen_tcp,
S,
Stream,
C).
2021-01-19 18:31:39 +08:00
2021-05-13 19:58:54 +08:00
close(S, C) ->
rabbit_stream_SUITE:test_close(gen_tcp, S, C).
2021-01-19 18:31:39 +08:00
options(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
verbose => false,
vhost => <<"/">>}, %% just for list_consumers and list_publishers
Opts.
flatten_command_result([], []) ->
[];
flatten_command_result([], Acc) ->
lists:reverse(Acc);
flatten_command_result([[{_K, _V} | _RecordRest] = Record | Rest],
Acc) ->
flatten_command_result(Rest, [Record | Acc]);
flatten_command_result([H | T], Acc) ->
Acc1 = flatten_command_result(H, Acc),
flatten_command_result(T, Acc1).
2021-01-19 18:31:39 +08:00
to_list(CommandRun) ->
Lists = 'Elixir.Enum':to_list(CommandRun),
%% we can get results from different connections, so we flatten out
flatten_command_result(Lists, []).
2021-01-19 18:31:39 +08:00
command_result_count(CommandRun) ->
length(to_list(CommandRun)).
connection_count(Config) ->
command_result_count(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>],
options(Config))).
consumer_count(Config) ->
command_result_count(?COMMAND_LIST_CONSUMERS:run([<<"stream">>],
options(Config))).
2021-01-19 21:49:30 +08:00
publisher_count(Config) ->
command_result_count(?COMMAND_LIST_PUBLISHERS:run([<<"stream">>],
options(Config))).
2021-01-19 18:31:39 +08:00
start_stream_connection(Port) ->
start_stream_connection(gen_tcp, Port).
start_stream_tls_connection(Port) ->
start_stream_connection(ssl, Port).
start_stream_connection(Transport, Port) ->
2023-04-24 21:11:44 +08:00
TlsOpts = case Transport of
ssl -> [{verify, verify_none}];
_ -> []
end,
2021-01-19 18:31:39 +08:00
{ok, S} =
Transport:connect("localhost", Port,
2023-04-24 21:11:44 +08:00
[{active, false}, {mode, binary}] ++ TlsOpts),
2021-05-13 19:58:54 +08:00
C0 = rabbit_stream_core:init(0),
C1 = rabbit_stream_SUITE:test_peer_properties(Transport, S, C0),
C = rabbit_stream_SUITE:test_authenticate(Transport, S, C1),
2021-05-13 19:58:54 +08:00
{S, C}.
2021-01-19 18:31:39 +08:00
start_amqp_connection(Type, Node, Port) ->
Params = amqp_params(Type, Node, Port),
{ok, _Connection} = amqp_connection:start(Params).
amqp_params(network, _, Port) ->
#amqp_params_network{port = Port};
amqp_params(direct, Node, _) ->
#amqp_params_direct{node = Node}.
queue_lookup(Config, Q) ->
QueueName = rabbit_misc:r(<<"/">>, queue, Q),
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_amqqueue,
lookup,
[QueueName]).
2023-10-05 22:01:18 +08:00
store_offset(S, Stream, Reference, Value, C) ->
StoreOffsetFrame =
rabbit_stream_core:frame({store_offset, Reference, Stream, Value}),
ok = gen_tcp:send(S, StoreOffsetFrame),
case check_stored_offset(S, Stream, Reference, Value, C, 20) of
ok ->
ok;
_ ->
{error, offset_not_stored}
end.
check_stored_offset(S, Stream, Reference, Expected, C) ->
check_stored_offset(S, Stream, Reference, Expected, C, 20).
2023-10-05 22:01:18 +08:00
check_stored_offset(_, _, _, _, _, 0) ->
error;
check_stored_offset(S, Stream, Reference, Expected, C, Attempt) ->
QueryOffsetFrame =
rabbit_stream_core:frame({request, 1, {query_offset, Reference, Stream}}),
ok = gen_tcp:send(S, QueryOffsetFrame),
{Cmd, _} = rabbit_stream_SUITE:receive_commands(gen_tcp, S, C),
?assertMatch({response, 1, {query_offset, ?RESPONSE_CODE_OK, _}}, Cmd),
{response, 1, {query_offset, ?RESPONSE_CODE_OK, StoredValue}} = Cmd,
case StoredValue of
Expected ->
ok;
_ ->
timer:sleep(50),
check_stored_offset(S, Stream, Reference, Expected, C, Attempt - 1)
end.
check_publisher_sequence(S, Stream, Reference, Expected, C) ->
check_publisher_sequence(S, Stream, Reference, Expected, C, 20).
check_publisher_sequence(_, _, _, _, _, 0) ->
error;
check_publisher_sequence(S, Stream, Reference, Expected, C, Attempt) ->
QueryFrame =
rabbit_stream_core:frame({request, 1, {query_publisher_sequence, Reference, Stream}}),
ok = gen_tcp:send(S, QueryFrame),
{Cmd, _} = rabbit_stream_SUITE:receive_commands(gen_tcp, S, C),
?assertMatch({response, 1, {query_publisher_sequence, _, _}}, Cmd),
{response, 1, {query_publisher_sequence, _, StoredValue}} = Cmd,
case StoredValue of
Expected ->
ok;
_ ->
timer:sleep(50),
check_publisher_sequence(S, Stream, Reference, Expected, C, Attempt - 1)
end.
gen_bin(L) ->
list_to_binary(lists:duplicate(L, "a")).