2021-01-19 18:31:39 +08:00
|
|
|
%% This Source Code Form is subject to the terms of the Mozilla Public
|
|
|
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
|
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
|
|
%%
|
2024-01-23 12:44:47 +08:00
|
|
|
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
2021-01-19 18:31:39 +08:00
|
|
|
%%
|
|
|
|
|
|
|
|
-module(commands_SUITE).
|
|
|
|
|
2021-05-13 19:58:54 +08:00
|
|
|
-compile(nowarn_export_all).
|
2021-01-19 18:31:39 +08:00
|
|
|
-compile([export_all]).
|
|
|
|
|
2021-05-13 19:58:54 +08:00
|
|
|
% -include_lib("common_test/include/ct.hrl").
|
2021-01-19 18:31:39 +08:00
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
2021-05-27 18:14:55 +08:00
|
|
|
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
|
2021-06-11 22:58:17 +08:00
|
|
|
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
|
2021-01-19 18:31:39 +08:00
|
|
|
|
2021-05-27 18:14:55 +08:00
|
|
|
-define(WAIT, 5000).
|
2021-01-19 18:31:39 +08:00
|
|
|
-define(COMMAND_LIST_CONNECTIONS,
|
|
|
|
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand').
|
|
|
|
-define(COMMAND_LIST_CONSUMERS,
|
|
|
|
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand').
|
2021-01-19 21:49:30 +08:00
|
|
|
-define(COMMAND_LIST_PUBLISHERS,
|
|
|
|
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand').
|
2021-09-24 21:18:39 +08:00
|
|
|
-define(COMMAND_ADD_SUPER_STREAM,
|
|
|
|
'Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand').
|
2023-11-07 23:56:18 +08:00
|
|
|
-define(COMMAND_DELETE_SUPER_STREAM_CLI,
|
2021-09-24 22:27:47 +08:00
|
|
|
'Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand').
|
2022-01-27 16:39:35 +08:00
|
|
|
-define(COMMAND_LIST_CONSUMER_GROUPS,
|
|
|
|
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumerGroupsCommand').
|
2022-01-28 17:55:57 +08:00
|
|
|
-define(COMMAND_LIST_GROUP_CONSUMERS,
|
|
|
|
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand').
|
2023-10-05 22:01:18 +08:00
|
|
|
-define(COMMAND_LIST_STREAM_TRACKING,
|
|
|
|
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamTrackingCommand').
|
2025-06-10 22:51:08 +08:00
|
|
|
-define(COMMAND_ACTIVATE_STREAM_CONSUMER,
|
|
|
|
'Elixir.RabbitMQ.CLI.Ctl.Commands.ActivateStreamConsumerCommand').
|
2025-07-01 17:16:48 +08:00
|
|
|
-define(COMMAND_RESET_OFFSET,
|
|
|
|
'Elixir.RabbitMQ.CLI.Ctl.Commands.ResetOffsetCommand').
|
2021-01-19 18:31:39 +08:00
|
|
|
|
|
|
|
all() ->
|
2021-09-24 21:18:39 +08:00
|
|
|
[{group, list_connections},
|
|
|
|
{group, list_consumers},
|
|
|
|
{group, list_publishers},
|
2022-01-27 16:39:35 +08:00
|
|
|
{group, list_consumer_groups},
|
2022-01-28 17:55:57 +08:00
|
|
|
{group, list_group_consumers},
|
2025-06-10 22:51:08 +08:00
|
|
|
{group, activate_consumer},
|
2023-10-05 22:01:18 +08:00
|
|
|
{group, list_stream_tracking},
|
2025-07-01 17:16:48 +08:00
|
|
|
{group, reset_offset},
|
2021-09-25 00:05:30 +08:00
|
|
|
{group, super_streams}].
|
2021-01-19 18:31:39 +08:00
|
|
|
|
|
|
|
groups() ->
|
|
|
|
[{list_connections, [],
|
2021-05-27 16:43:33 +08:00
|
|
|
[list_connections_merge_defaults, list_connections_run,
|
|
|
|
list_tls_connections_run]},
|
2021-01-19 18:31:39 +08:00
|
|
|
{list_consumers, [],
|
2021-01-19 21:49:30 +08:00
|
|
|
[list_consumers_merge_defaults, list_consumers_run]},
|
|
|
|
{list_publishers, [],
|
2021-09-24 21:18:39 +08:00
|
|
|
[list_publishers_merge_defaults, list_publishers_run]},
|
2022-01-27 16:39:35 +08:00
|
|
|
{list_consumer_groups, [],
|
2023-10-10 22:44:05 +08:00
|
|
|
[list_consumer_groups_validate, list_consumer_groups_merge_defaults,
|
|
|
|
list_consumer_groups_run]},
|
2022-01-28 17:55:57 +08:00
|
|
|
{list_group_consumers, [],
|
|
|
|
[list_group_consumers_validate, list_group_consumers_merge_defaults,
|
|
|
|
list_group_consumers_run]},
|
2025-06-10 22:51:08 +08:00
|
|
|
{activate_consumer, [],
|
|
|
|
[activate_consumer_validate, activate_consumer_merge_defaults,
|
|
|
|
activate_consumer_run]},
|
2023-10-05 22:01:18 +08:00
|
|
|
{list_stream_tracking, [],
|
|
|
|
[list_stream_tracking_validate, list_stream_tracking_merge_defaults,
|
|
|
|
list_stream_tracking_run]},
|
2025-07-01 17:16:48 +08:00
|
|
|
{reset_offset, [],
|
|
|
|
[reset_offset_validate, reset_offset_merge_defaults,
|
|
|
|
reset_offset_run]},
|
2021-09-24 21:18:39 +08:00
|
|
|
{super_streams, [],
|
2021-09-24 22:27:47 +08:00
|
|
|
[add_super_stream_merge_defaults,
|
|
|
|
add_super_stream_validate,
|
|
|
|
delete_super_stream_merge_defaults,
|
|
|
|
delete_super_stream_validate,
|
|
|
|
add_delete_super_stream_run]}].
|
2021-01-19 18:31:39 +08:00
|
|
|
|
|
|
|
init_per_suite(Config) ->
|
2021-06-25 17:10:04 +08:00
|
|
|
case rabbit_ct_helpers:is_mixed_versions() of
|
|
|
|
true ->
|
2021-09-13 23:53:25 +08:00
|
|
|
{skip,
|
|
|
|
"mixed version clusters are not supported for "
|
|
|
|
"this suite"};
|
2021-06-25 17:10:04 +08:00
|
|
|
_ ->
|
|
|
|
Config1 =
|
|
|
|
rabbit_ct_helpers:set_config(Config,
|
|
|
|
[{rmq_nodename_suffix, ?MODULE}]),
|
|
|
|
Config2 =
|
|
|
|
rabbit_ct_helpers:set_config(Config1,
|
2021-09-09 16:37:41 +08:00
|
|
|
{rabbitmq_ct_tls_verify,
|
|
|
|
verify_none}),
|
2021-06-25 17:10:04 +08:00
|
|
|
rabbit_ct_helpers:log_environment(),
|
|
|
|
rabbit_ct_helpers:run_setup_steps(Config2,
|
|
|
|
rabbit_ct_broker_helpers:setup_steps())
|
|
|
|
end.
|
2021-01-19 18:31:39 +08:00
|
|
|
|
|
|
|
end_per_suite(Config) ->
|
|
|
|
rabbit_ct_helpers:run_teardown_steps(Config,
|
|
|
|
rabbit_ct_broker_helpers:teardown_steps()).
|
|
|
|
|
|
|
|
init_per_group(_, Config) ->
|
|
|
|
Config.
|
|
|
|
|
|
|
|
end_per_group(_, Config) ->
|
|
|
|
Config.
|
|
|
|
|
|
|
|
init_per_testcase(Testcase, Config) ->
|
|
|
|
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
|
|
|
|
|
|
|
end_per_testcase(Testcase, Config) ->
|
|
|
|
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
|
|
|
|
|
|
|
list_connections_merge_defaults(_Config) ->
|
2023-09-29 17:10:34 +08:00
|
|
|
{[<<"node">>, <<"conn_name">>], #{verbose := false}} =
|
2021-01-19 18:31:39 +08:00
|
|
|
?COMMAND_LIST_CONNECTIONS:merge_defaults([], #{}),
|
|
|
|
|
|
|
|
{[<<"other_key">>], #{verbose := true}} =
|
|
|
|
?COMMAND_LIST_CONNECTIONS:merge_defaults([<<"other_key">>],
|
|
|
|
#{verbose => true}),
|
|
|
|
|
|
|
|
{[<<"other_key">>], #{verbose := false}} =
|
|
|
|
?COMMAND_LIST_CONNECTIONS:merge_defaults([<<"other_key">>],
|
|
|
|
#{verbose => false}).
|
|
|
|
|
|
|
|
list_connections_run(Config) ->
|
|
|
|
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
|
|
Opts =
|
|
|
|
#{node => Node,
|
|
|
|
timeout => 10000,
|
|
|
|
verbose => false},
|
|
|
|
|
|
|
|
%% No connections
|
|
|
|
[] = to_list(?COMMAND_LIST_CONNECTIONS:run([], Opts)),
|
|
|
|
|
|
|
|
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
|
|
|
|
|
2021-05-13 19:58:54 +08:00
|
|
|
{S1, C1} = start_stream_connection(StreamPort),
|
2021-05-27 18:14:55 +08:00
|
|
|
?awaitMatch(1, connection_count(Config), ?WAIT),
|
2021-01-19 18:31:39 +08:00
|
|
|
|
|
|
|
[[{conn_name, _}]] =
|
|
|
|
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
|
2021-05-27 16:43:33 +08:00
|
|
|
[[{ssl, false}]] =
|
|
|
|
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"ssl">>], Opts)),
|
2021-01-19 18:31:39 +08:00
|
|
|
|
2021-05-13 19:58:54 +08:00
|
|
|
{S2, C2} = start_stream_connection(StreamPort),
|
2021-05-27 18:14:55 +08:00
|
|
|
?awaitMatch(2, connection_count(Config), ?WAIT),
|
2021-01-19 18:31:39 +08:00
|
|
|
|
|
|
|
[[{conn_name, _}], [{conn_name, _}]] =
|
|
|
|
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
|
|
|
|
|
|
|
|
Port =
|
|
|
|
rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
|
|
|
|
start_amqp_connection(network, Node, Port),
|
|
|
|
|
|
|
|
%% There are still just two connections
|
|
|
|
[[{conn_name, _}], [{conn_name, _}]] =
|
|
|
|
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
|
|
|
|
|
|
|
|
start_amqp_connection(direct, Node, Port),
|
|
|
|
|
|
|
|
%% Still two stream connections, one direct AMQP 0-9-1 connection
|
|
|
|
[[{conn_name, _}], [{conn_name, _}]] =
|
|
|
|
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
|
|
|
|
|
|
|
|
%% Verbose returns all keys
|
|
|
|
Infos =
|
|
|
|
lists:map(fun(El) -> atom_to_binary(El, utf8) end, ?INFO_ITEMS),
|
|
|
|
AllKeys = to_list(?COMMAND_LIST_CONNECTIONS:run(Infos, Opts)),
|
|
|
|
Verbose =
|
|
|
|
to_list(?COMMAND_LIST_CONNECTIONS:run([], Opts#{verbose => true})),
|
|
|
|
?assertEqual(AllKeys, Verbose),
|
|
|
|
|
|
|
|
%% There are two connections
|
|
|
|
[First, _Second] = AllKeys,
|
|
|
|
|
|
|
|
%% Keys are INFO_ITEMS
|
|
|
|
?assertEqual(length(?INFO_ITEMS), length(First)),
|
|
|
|
|
|
|
|
{Keys, _} = lists:unzip(First),
|
|
|
|
|
|
|
|
?assertEqual([], Keys -- ?INFO_ITEMS),
|
|
|
|
?assertEqual([], ?INFO_ITEMS -- Keys),
|
|
|
|
|
2021-05-22 00:48:55 +08:00
|
|
|
rabbit_stream_SUITE:test_close(gen_tcp, S1, C1),
|
|
|
|
rabbit_stream_SUITE:test_close(gen_tcp, S2, C2),
|
2021-01-19 18:31:39 +08:00
|
|
|
ok.
|
|
|
|
|
2021-05-27 16:43:33 +08:00
|
|
|
list_tls_connections_run(Config) ->
|
|
|
|
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
|
|
Opts =
|
|
|
|
#{node => Node,
|
|
|
|
timeout => 10000,
|
|
|
|
verbose => false},
|
|
|
|
|
|
|
|
%% No connections
|
|
|
|
[] = to_list(?COMMAND_LIST_CONNECTIONS:run([], Opts)),
|
|
|
|
|
|
|
|
StreamTlsPort = rabbit_stream_SUITE:get_stream_port_tls(Config),
|
|
|
|
application:ensure_all_started(ssl),
|
|
|
|
|
|
|
|
{S1, C1} = start_stream_tls_connection(StreamTlsPort),
|
2021-05-27 18:14:55 +08:00
|
|
|
?awaitMatch(1, connection_count(Config), ?WAIT),
|
2021-05-27 16:43:33 +08:00
|
|
|
|
|
|
|
[[{conn_name, _}]] =
|
|
|
|
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
|
|
|
|
[[{ssl, true}]] =
|
|
|
|
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"ssl">>], Opts)),
|
|
|
|
|
|
|
|
rabbit_stream_SUITE:test_close(ssl, S1, C1),
|
|
|
|
ok.
|
|
|
|
|
2021-01-19 18:31:39 +08:00
|
|
|
list_consumers_merge_defaults(_Config) ->
|
|
|
|
DefaultItems =
|
|
|
|
[rabbit_data_coercion:to_binary(Item)
|
2023-09-29 16:53:35 +08:00
|
|
|
|| Item <- ?CONSUMER_INFO_ITEMS -- [connection_pid, node]],
|
2021-01-19 18:31:39 +08:00
|
|
|
{DefaultItems, #{verbose := false}} =
|
|
|
|
?COMMAND_LIST_CONSUMERS:merge_defaults([], #{}),
|
|
|
|
|
|
|
|
{[<<"other_key">>], #{verbose := true}} =
|
|
|
|
?COMMAND_LIST_CONSUMERS:merge_defaults([<<"other_key">>],
|
|
|
|
#{verbose => true}),
|
|
|
|
|
|
|
|
{[<<"other_key">>], #{verbose := false}} =
|
|
|
|
?COMMAND_LIST_CONSUMERS:merge_defaults([<<"other_key">>],
|
|
|
|
#{verbose => false}).
|
|
|
|
|
|
|
|
list_consumers_run(Config) ->
|
|
|
|
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
|
|
Opts =
|
|
|
|
#{node => Node,
|
|
|
|
timeout => 10000,
|
|
|
|
verbose => false,
|
|
|
|
vhost => <<"/">>},
|
|
|
|
|
|
|
|
%% No connections, no consumers
|
|
|
|
[] = to_list(?COMMAND_LIST_CONSUMERS:run([], Opts)),
|
|
|
|
|
|
|
|
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
|
2021-05-13 19:58:54 +08:00
|
|
|
{S1, C1} = start_stream_connection(StreamPort),
|
2021-05-27 18:14:55 +08:00
|
|
|
?awaitMatch(1, connection_count(Config), ?WAIT),
|
2021-01-19 18:31:39 +08:00
|
|
|
|
|
|
|
Stream = <<"list_consumers_run">>,
|
2021-05-13 19:58:54 +08:00
|
|
|
C1_1 = create_stream(S1, Stream, C1),
|
2021-01-19 18:31:39 +08:00
|
|
|
SubId = 42,
|
2021-05-13 19:58:54 +08:00
|
|
|
C1_2 = subscribe(S1, SubId, Stream, C1_1),
|
2021-01-19 18:31:39 +08:00
|
|
|
|
2021-05-27 18:14:55 +08:00
|
|
|
?awaitMatch(1, consumer_count(Config), ?WAIT),
|
2021-01-19 18:31:39 +08:00
|
|
|
|
2021-05-13 19:58:54 +08:00
|
|
|
{S2, C2} = start_stream_connection(StreamPort),
|
2021-05-27 18:14:55 +08:00
|
|
|
?awaitMatch(2, connection_count(Config), ?WAIT),
|
2021-05-13 19:58:54 +08:00
|
|
|
C2_1 = subscribe(S2, SubId, Stream, C2),
|
2021-01-19 18:31:39 +08:00
|
|
|
|
2021-05-27 18:14:55 +08:00
|
|
|
?awaitMatch(2, consumer_count(Config), ?WAIT),
|
2021-01-19 18:31:39 +08:00
|
|
|
|
|
|
|
%% Verbose returns all keys
|
|
|
|
InfoItems = ?CONSUMER_INFO_ITEMS,
|
|
|
|
Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, InfoItems),
|
|
|
|
AllKeys = to_list(?COMMAND_LIST_CONSUMERS:run(Infos, Opts)),
|
|
|
|
Verbose =
|
|
|
|
to_list(?COMMAND_LIST_CONSUMERS:run([], Opts#{verbose => true})),
|
|
|
|
?assertEqual(AllKeys, Verbose),
|
|
|
|
%% There are two consumers
|
2022-01-27 16:39:35 +08:00
|
|
|
[First, _Second] = AllKeys,
|
2021-01-19 18:31:39 +08:00
|
|
|
|
|
|
|
%% Keys are info items
|
|
|
|
?assertEqual(length(InfoItems), length(First)),
|
|
|
|
|
|
|
|
{Keys, _} = lists:unzip(First),
|
|
|
|
|
|
|
|
?assertEqual([], Keys -- InfoItems),
|
|
|
|
?assertEqual([], InfoItems -- Keys),
|
|
|
|
|
2021-05-13 19:58:54 +08:00
|
|
|
C1_3 = delete_stream(S1, Stream, C1_2),
|
|
|
|
% metadata_update_stream_deleted(S1, Stream),
|
|
|
|
metadata_update_stream_deleted(S2, Stream, C2_1),
|
|
|
|
close(S1, C1_3),
|
|
|
|
close(S2, C2_1),
|
2021-05-27 18:14:55 +08:00
|
|
|
?awaitMatch(0, consumer_count(Config), ?WAIT),
|
2021-01-19 18:31:39 +08:00
|
|
|
ok.
|
|
|
|
|
2021-01-19 21:49:30 +08:00
|
|
|
list_publishers_merge_defaults(_Config) ->
|
|
|
|
DefaultItems =
|
|
|
|
[rabbit_data_coercion:to_binary(Item)
|
2023-09-29 16:53:35 +08:00
|
|
|
|| Item <- ?PUBLISHER_INFO_ITEMS -- [connection_pid, node]],
|
2021-01-19 21:49:30 +08:00
|
|
|
{DefaultItems, #{verbose := false}} =
|
|
|
|
?COMMAND_LIST_PUBLISHERS:merge_defaults([], #{}),
|
|
|
|
|
|
|
|
{[<<"other_key">>], #{verbose := true}} =
|
|
|
|
?COMMAND_LIST_PUBLISHERS:merge_defaults([<<"other_key">>],
|
|
|
|
#{verbose => true}),
|
|
|
|
|
|
|
|
{[<<"other_key">>], #{verbose := false}} =
|
|
|
|
?COMMAND_LIST_PUBLISHERS:merge_defaults([<<"other_key">>],
|
|
|
|
#{verbose => false}).
|
|
|
|
|
|
|
|
list_publishers_run(Config) ->
|
|
|
|
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
|
|
Opts =
|
|
|
|
#{node => Node,
|
|
|
|
timeout => 10000,
|
|
|
|
verbose => false,
|
|
|
|
vhost => <<"/">>},
|
|
|
|
|
|
|
|
%% No connections, no publishers
|
|
|
|
[] = to_list(?COMMAND_LIST_PUBLISHERS:run([], Opts)),
|
|
|
|
|
|
|
|
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
|
2021-05-13 19:58:54 +08:00
|
|
|
{S1, C1} = start_stream_connection(StreamPort),
|
2021-05-27 18:14:55 +08:00
|
|
|
?awaitMatch(1, connection_count(Config), ?WAIT),
|
2021-01-19 21:49:30 +08:00
|
|
|
|
|
|
|
Stream = <<"list_publishers_run">>,
|
2021-05-13 19:58:54 +08:00
|
|
|
C1_1 = create_stream(S1, Stream, C1),
|
2021-01-19 21:49:30 +08:00
|
|
|
PubId = 42,
|
2021-05-13 19:58:54 +08:00
|
|
|
C1_2 = declare_publisher(S1, PubId, Stream, C1_1),
|
2021-01-19 21:49:30 +08:00
|
|
|
|
2021-05-27 18:14:55 +08:00
|
|
|
?awaitMatch(1, publisher_count(Config), ?WAIT),
|
2021-01-19 21:49:30 +08:00
|
|
|
|
2021-05-13 19:58:54 +08:00
|
|
|
{S2, C2} = start_stream_connection(StreamPort),
|
2021-05-27 18:14:55 +08:00
|
|
|
?awaitMatch(2, connection_count(Config), ?WAIT),
|
2021-05-13 19:58:54 +08:00
|
|
|
C2_1 = declare_publisher(S2, PubId, Stream, C2),
|
2021-01-19 21:49:30 +08:00
|
|
|
|
2021-05-27 18:14:55 +08:00
|
|
|
?awaitMatch(2, publisher_count(Config), ?WAIT),
|
2021-01-19 21:49:30 +08:00
|
|
|
|
|
|
|
%% Verbose returns all keys
|
|
|
|
InfoItems = ?PUBLISHER_INFO_ITEMS,
|
|
|
|
Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, InfoItems),
|
|
|
|
AllKeys = to_list(?COMMAND_LIST_PUBLISHERS:run(Infos, Opts)),
|
|
|
|
Verbose =
|
|
|
|
to_list(?COMMAND_LIST_PUBLISHERS:run([], Opts#{verbose => true})),
|
|
|
|
?assertEqual(AllKeys, Verbose),
|
2022-01-27 16:39:35 +08:00
|
|
|
%% There are two publishers
|
|
|
|
[First, _Second] = AllKeys,
|
2021-01-19 21:49:30 +08:00
|
|
|
|
|
|
|
%% Keys are info items
|
|
|
|
?assertEqual(length(InfoItems), length(First)),
|
|
|
|
|
|
|
|
{Keys, _} = lists:unzip(First),
|
|
|
|
|
|
|
|
?assertEqual([], Keys -- InfoItems),
|
|
|
|
?assertEqual([], InfoItems -- Keys),
|
|
|
|
|
2021-05-13 19:58:54 +08:00
|
|
|
C1_3 = delete_stream(S1, Stream, C1_2),
|
|
|
|
% metadata_update_stream_deleted(S1, Stream),
|
|
|
|
C2_2 = metadata_update_stream_deleted(S2, Stream, C2_1),
|
|
|
|
close(S1, C1_3),
|
|
|
|
close(S2, C2_2),
|
2021-05-27 18:14:55 +08:00
|
|
|
?awaitMatch(0, publisher_count(Config), ?WAIT),
|
2021-01-19 21:49:30 +08:00
|
|
|
ok.
|
|
|
|
|
2023-10-10 22:44:05 +08:00
|
|
|
list_consumer_groups_validate(_) ->
|
|
|
|
ValidOpts = #{vhost => <<"/">>},
|
|
|
|
?assertMatch({validation_failure, {bad_info_key, [foo]}},
|
|
|
|
?COMMAND_LIST_CONSUMER_GROUPS:validate([<<"foo">>],
|
|
|
|
ValidOpts)),
|
|
|
|
?assertMatch(ok,
|
|
|
|
?COMMAND_LIST_CONSUMER_GROUPS:validate([<<"reference">>],
|
|
|
|
ValidOpts)),
|
|
|
|
?assertMatch(ok,
|
|
|
|
?COMMAND_LIST_CONSUMER_GROUPS:validate([], ValidOpts)).
|
|
|
|
|
|
|
|
|
2022-01-27 16:39:35 +08:00
|
|
|
list_consumer_groups_merge_defaults(_Config) ->
|
2022-01-27 18:13:50 +08:00
|
|
|
DefaultItems =
|
|
|
|
[rabbit_data_coercion:to_binary(Item)
|
|
|
|
|| Item <- ?CONSUMER_GROUP_INFO_ITEMS],
|
|
|
|
{DefaultItems, #{verbose := false, vhost := <<"/">>}} =
|
2022-01-27 16:39:35 +08:00
|
|
|
?COMMAND_LIST_CONSUMER_GROUPS:merge_defaults([], #{}),
|
|
|
|
|
2022-01-27 18:13:50 +08:00
|
|
|
{[<<"other_key">>], #{verbose := true, vhost := <<"/">>}} =
|
|
|
|
?COMMAND_LIST_CONSUMER_GROUPS:merge_defaults([<<"other_key">>],
|
|
|
|
#{verbose => true}),
|
|
|
|
|
|
|
|
{[<<"other_key">>], #{verbose := false, vhost := <<"/">>}} =
|
|
|
|
?COMMAND_LIST_CONSUMER_GROUPS:merge_defaults([<<"other_key">>],
|
|
|
|
#{verbose => false}).
|
2022-01-27 16:39:35 +08:00
|
|
|
|
|
|
|
list_consumer_groups_run(Config) ->
|
|
|
|
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
|
|
Opts =
|
|
|
|
#{node => Node,
|
|
|
|
timeout => 10000,
|
2022-01-27 18:13:50 +08:00
|
|
|
vhost => <<"/">>,
|
|
|
|
verbose => true},
|
2022-01-27 16:39:35 +08:00
|
|
|
|
|
|
|
%% No connections, no consumers
|
|
|
|
{ok, []} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
|
|
|
|
|
|
|
|
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
|
Prevent blocked groups in stream SAC with fine-grained status
A boolean status in the stream SAC coordinator is not enough to follow
the evolution of a consumer. For example a former active consumer that
is stepping down can go down before another consumer in the group is
activated, letting the coordinator expect an activation request that
will never arrive, leaving the group without any active consumer.
This commit introduces 3 status: active (formerly "true"), waiting
(formerly "false"), and deactivating. The coordinator will now know when
a deactivating consumer goes down and will trigger a rebalancing to
avoid a stuck group.
This commit also introduces a status related to the connectivity state
of a consumer. The possible values are: connected, disconnected, and
presumed_down. Consumers are by default connected, they can become
disconnected if the coordinator receives a down event with a
noconnection reason, meaning the node of the consumer has been
disconnected from the other nodes. Consumers can become connected again when
their node joins the other nodes again.
Disconnected consumers are still considered part of a group, as they are
expected to come back at some point. For example there is no rebalancing
in a group if the active consumer got disconnected.
The coordinator sets a timer when a disconnection occurs. When the timer
expires, corresponding disconnected consumers pass into the "presumed
down" state. At this point they are no longer considered part of their
respective group and are excluded from rebalancing decision. They are expected
to get removed from the group by the appropriate down event of a
monitor.
So the consumer status is now a tuple, e.g. {connected, active}. Note
this is an implementation detail: only the stream SAC coordinator deals with
the status of stream SAC consumers.
2 new configuration entries are introduced:
* rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the
disconnected-to-forgotten timer.
* rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands
in the coordinator. It used to be a fixed value of 30 seconds. The
default value is still the same. The setting has been introduced to
make integration tests faster.
Fixes #14070
2025-06-10 18:01:18 +08:00
|
|
|
{S, C0} = start_stream_connection(StreamPort),
|
2022-01-27 16:39:35 +08:00
|
|
|
?awaitMatch(1, connection_count(Config), ?WAIT),
|
|
|
|
|
|
|
|
ConsumerReference = <<"foo">>,
|
|
|
|
SubProperties =
|
|
|
|
#{<<"single-active-consumer">> => <<"true">>,
|
|
|
|
<<"name">> => ConsumerReference},
|
|
|
|
|
|
|
|
Stream1 = <<"list_consumer_groups_run_1">>,
|
Prevent blocked groups in stream SAC with fine-grained status
A boolean status in the stream SAC coordinator is not enough to follow
the evolution of a consumer. For example a former active consumer that
is stepping down can go down before another consumer in the group is
activated, letting the coordinator expect an activation request that
will never arrive, leaving the group without any active consumer.
This commit introduces 3 status: active (formerly "true"), waiting
(formerly "false"), and deactivating. The coordinator will now know when
a deactivating consumer goes down and will trigger a rebalancing to
avoid a stuck group.
This commit also introduces a status related to the connectivity state
of a consumer. The possible values are: connected, disconnected, and
presumed_down. Consumers are by default connected, they can become
disconnected if the coordinator receives a down event with a
noconnection reason, meaning the node of the consumer has been
disconnected from the other nodes. Consumers can become connected again when
their node joins the other nodes again.
Disconnected consumers are still considered part of a group, as they are
expected to come back at some point. For example there is no rebalancing
in a group if the active consumer got disconnected.
The coordinator sets a timer when a disconnection occurs. When the timer
expires, corresponding disconnected consumers pass into the "presumed
down" state. At this point they are no longer considered part of their
respective group and are excluded from rebalancing decision. They are expected
to get removed from the group by the appropriate down event of a
monitor.
So the consumer status is now a tuple, e.g. {connected, active}. Note
this is an implementation detail: only the stream SAC coordinator deals with
the status of stream SAC consumers.
2 new configuration entries are introduced:
* rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the
disconnected-to-forgotten timer.
* rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands
in the coordinator. It used to be a fixed value of 30 seconds. The
default value is still the same. The setting has been introduced to
make integration tests faster.
Fixes #14070
2025-06-10 18:01:18 +08:00
|
|
|
C1 = create_stream(S, Stream1, C0),
|
|
|
|
C2 = subscribe(S, 0, Stream1, SubProperties, C1),
|
|
|
|
C3 = handle_consumer_update(S, C2, 0),
|
|
|
|
C4 = subscribe(S, 1, Stream1, SubProperties, C3),
|
|
|
|
C5 = subscribe(S, 2, Stream1, SubProperties, C4),
|
2022-01-27 16:39:35 +08:00
|
|
|
|
|
|
|
?awaitMatch(3, consumer_count(Config), ?WAIT),
|
|
|
|
|
|
|
|
{ok, [CG1]} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
|
|
|
|
assertConsumerGroup(Stream1, ConsumerReference, -1, 3, CG1),
|
|
|
|
|
|
|
|
Stream2 = <<"list_consumer_groups_run_2">>,
|
Prevent blocked groups in stream SAC with fine-grained status
A boolean status in the stream SAC coordinator is not enough to follow
the evolution of a consumer. For example a former active consumer that
is stepping down can go down before another consumer in the group is
activated, letting the coordinator expect an activation request that
will never arrive, leaving the group without any active consumer.
This commit introduces 3 status: active (formerly "true"), waiting
(formerly "false"), and deactivating. The coordinator will now know when
a deactivating consumer goes down and will trigger a rebalancing to
avoid a stuck group.
This commit also introduces a status related to the connectivity state
of a consumer. The possible values are: connected, disconnected, and
presumed_down. Consumers are by default connected, they can become
disconnected if the coordinator receives a down event with a
noconnection reason, meaning the node of the consumer has been
disconnected from the other nodes. Consumers can become connected again when
their node joins the other nodes again.
Disconnected consumers are still considered part of a group, as they are
expected to come back at some point. For example there is no rebalancing
in a group if the active consumer got disconnected.
The coordinator sets a timer when a disconnection occurs. When the timer
expires, corresponding disconnected consumers pass into the "presumed
down" state. At this point they are no longer considered part of their
respective group and are excluded from rebalancing decision. They are expected
to get removed from the group by the appropriate down event of a
monitor.
So the consumer status is now a tuple, e.g. {connected, active}. Note
this is an implementation detail: only the stream SAC coordinator deals with
the status of stream SAC consumers.
2 new configuration entries are introduced:
* rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the
disconnected-to-forgotten timer.
* rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands
in the coordinator. It used to be a fixed value of 30 seconds. The
default value is still the same. The setting has been introduced to
make integration tests faster.
Fixes #14070
2025-06-10 18:01:18 +08:00
|
|
|
C6 = create_stream(S, Stream2, C5),
|
|
|
|
C7 = subscribe(S, 3, Stream2, SubProperties, C6),
|
|
|
|
C8 = handle_consumer_update(S, C7, 3),
|
|
|
|
C9 = subscribe(S, 4, Stream2, SubProperties, C8),
|
|
|
|
C10 = subscribe(S, 5, Stream2, SubProperties, C9),
|
2022-01-27 16:39:35 +08:00
|
|
|
|
|
|
|
?awaitMatch(3 + 3, consumer_count(Config), ?WAIT),
|
|
|
|
|
|
|
|
{ok, [CG1, CG2]} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
|
|
|
|
assertConsumerGroup(Stream1, ConsumerReference, -1, 3, CG1),
|
|
|
|
assertConsumerGroup(Stream2, ConsumerReference, -1, 3, CG2),
|
|
|
|
|
Prevent blocked groups in stream SAC with fine-grained status
A boolean status in the stream SAC coordinator is not enough to follow
the evolution of a consumer. For example a former active consumer that
is stepping down can go down before another consumer in the group is
activated, letting the coordinator expect an activation request that
will never arrive, leaving the group without any active consumer.
This commit introduces 3 status: active (formerly "true"), waiting
(formerly "false"), and deactivating. The coordinator will now know when
a deactivating consumer goes down and will trigger a rebalancing to
avoid a stuck group.
This commit also introduces a status related to the connectivity state
of a consumer. The possible values are: connected, disconnected, and
presumed_down. Consumers are by default connected, they can become
disconnected if the coordinator receives a down event with a
noconnection reason, meaning the node of the consumer has been
disconnected from the other nodes. Consumers can become connected again when
their node joins the other nodes again.
Disconnected consumers are still considered part of a group, as they are
expected to come back at some point. For example there is no rebalancing
in a group if the active consumer got disconnected.
The coordinator sets a timer when a disconnection occurs. When the timer
expires, corresponding disconnected consumers pass into the "presumed
down" state. At this point they are no longer considered part of their
respective group and are excluded from rebalancing decision. They are expected
to get removed from the group by the appropriate down event of a
monitor.
So the consumer status is now a tuple, e.g. {connected, active}. Note
this is an implementation detail: only the stream SAC coordinator deals with
the status of stream SAC consumers.
2 new configuration entries are introduced:
* rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the
disconnected-to-forgotten timer.
* rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands
in the coordinator. It used to be a fixed value of 30 seconds. The
default value is still the same. The setting has been introduced to
make integration tests faster.
Fixes #14070
2025-06-10 18:01:18 +08:00
|
|
|
C11 = delete_stream(S, Stream1, C10),
|
|
|
|
C12 = delete_stream(S, Stream2, C11),
|
2022-01-27 16:39:35 +08:00
|
|
|
|
Prevent blocked groups in stream SAC with fine-grained status
A boolean status in the stream SAC coordinator is not enough to follow
the evolution of a consumer. For example a former active consumer that
is stepping down can go down before another consumer in the group is
activated, letting the coordinator expect an activation request that
will never arrive, leaving the group without any active consumer.
This commit introduces 3 status: active (formerly "true"), waiting
(formerly "false"), and deactivating. The coordinator will now know when
a deactivating consumer goes down and will trigger a rebalancing to
avoid a stuck group.
This commit also introduces a status related to the connectivity state
of a consumer. The possible values are: connected, disconnected, and
presumed_down. Consumers are by default connected, they can become
disconnected if the coordinator receives a down event with a
noconnection reason, meaning the node of the consumer has been
disconnected from the other nodes. Consumers can become connected again when
their node joins the other nodes again.
Disconnected consumers are still considered part of a group, as they are
expected to come back at some point. For example there is no rebalancing
in a group if the active consumer got disconnected.
The coordinator sets a timer when a disconnection occurs. When the timer
expires, corresponding disconnected consumers pass into the "presumed
down" state. At this point they are no longer considered part of their
respective group and are excluded from rebalancing decision. They are expected
to get removed from the group by the appropriate down event of a
monitor.
So the consumer status is now a tuple, e.g. {connected, active}. Note
this is an implementation detail: only the stream SAC coordinator deals with
the status of stream SAC consumers.
2 new configuration entries are introduced:
* rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the
disconnected-to-forgotten timer.
* rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands
in the coordinator. It used to be a fixed value of 30 seconds. The
default value is still the same. The setting has been introduced to
make integration tests faster.
Fixes #14070
2025-06-10 18:01:18 +08:00
|
|
|
close(S, C12),
|
2022-01-27 16:39:35 +08:00
|
|
|
{ok, []} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
|
|
|
|
ok.
|
|
|
|
|
2022-01-28 17:55:57 +08:00
|
|
|
list_group_consumers_validate(_) ->
|
|
|
|
ValidOpts =
|
|
|
|
#{vhost => <<"/">>,
|
|
|
|
stream => <<"s1">>,
|
|
|
|
reference => <<"foo">>},
|
|
|
|
?assertMatch({validation_failure, not_enough_args},
|
|
|
|
?COMMAND_LIST_GROUP_CONSUMERS:validate([], #{})),
|
|
|
|
?assertMatch({validation_failure, not_enough_args},
|
|
|
|
?COMMAND_LIST_GROUP_CONSUMERS:validate([],
|
|
|
|
#{vhost =>
|
|
|
|
<<"test">>})),
|
|
|
|
?assertMatch({validation_failure, {bad_info_key, [foo]}},
|
|
|
|
?COMMAND_LIST_GROUP_CONSUMERS:validate([<<"foo">>],
|
|
|
|
ValidOpts)),
|
|
|
|
?assertMatch(ok,
|
|
|
|
?COMMAND_LIST_GROUP_CONSUMERS:validate([<<"subscription_id">>],
|
|
|
|
ValidOpts)),
|
|
|
|
?assertMatch(ok,
|
|
|
|
?COMMAND_LIST_GROUP_CONSUMERS:validate([], ValidOpts)).
|
|
|
|
|
|
|
|
list_group_consumers_merge_defaults(_Config) ->
|
|
|
|
DefaultItems =
|
|
|
|
[rabbit_data_coercion:to_binary(Item)
|
|
|
|
|| Item <- ?GROUP_CONSUMER_INFO_ITEMS],
|
|
|
|
{DefaultItems, #{verbose := false, vhost := <<"/">>}} =
|
|
|
|
?COMMAND_LIST_GROUP_CONSUMERS:merge_defaults([], #{}),
|
|
|
|
|
|
|
|
{[<<"other_key">>], #{verbose := true, vhost := <<"/">>}} =
|
|
|
|
?COMMAND_LIST_GROUP_CONSUMERS:merge_defaults([<<"other_key">>],
|
|
|
|
#{verbose => true}),
|
|
|
|
|
|
|
|
{[<<"other_key">>], #{verbose := false, vhost := <<"/">>}} =
|
|
|
|
?COMMAND_LIST_GROUP_CONSUMERS:merge_defaults([<<"other_key">>],
|
|
|
|
#{verbose => false}).
|
|
|
|
|
|
|
|
list_group_consumers_run(Config) ->
|
|
|
|
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
|
|
Opts =
|
|
|
|
#{node => Node,
|
|
|
|
timeout => 10000,
|
|
|
|
vhost => <<"/">>,
|
|
|
|
verbose => false},
|
|
|
|
Args = [<<"subscription_id">>, <<"state">>],
|
|
|
|
|
|
|
|
Stream1 = <<"list_group_consumers_run_1">>,
|
|
|
|
ConsumerReference = <<"foo">>,
|
|
|
|
OptsGroup1 =
|
|
|
|
maps:merge(#{stream => Stream1, reference => ConsumerReference},
|
|
|
|
Opts),
|
|
|
|
|
|
|
|
%% the group does not exist yet
|
|
|
|
{error, not_found} =
|
|
|
|
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup1),
|
|
|
|
|
|
|
|
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
|
|
|
|
{S, C} = start_stream_connection(StreamPort),
|
|
|
|
?awaitMatch(1, connection_count(Config), ?WAIT),
|
|
|
|
|
|
|
|
SubProperties =
|
|
|
|
#{<<"single-active-consumer">> => <<"true">>,
|
|
|
|
<<"name">> => ConsumerReference},
|
|
|
|
|
|
|
|
create_stream(S, Stream1, C),
|
|
|
|
subscribe(S, 0, Stream1, SubProperties, C),
|
|
|
|
handle_consumer_update(S, C, 0),
|
|
|
|
subscribe(S, 1, Stream1, SubProperties, C),
|
|
|
|
subscribe(S, 2, Stream1, SubProperties, C),
|
|
|
|
|
|
|
|
?awaitMatch(3, consumer_count(Config), ?WAIT),
|
|
|
|
|
|
|
|
{ok, Consumers1} =
|
|
|
|
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup1),
|
Prevent blocked groups in stream SAC with fine-grained status
A boolean status in the stream SAC coordinator is not enough to follow
the evolution of a consumer. For example a former active consumer that
is stepping down can go down before another consumer in the group is
activated, letting the coordinator expect an activation request that
will never arrive, leaving the group without any active consumer.
This commit introduces 3 status: active (formerly "true"), waiting
(formerly "false"), and deactivating. The coordinator will now know when
a deactivating consumer goes down and will trigger a rebalancing to
avoid a stuck group.
This commit also introduces a status related to the connectivity state
of a consumer. The possible values are: connected, disconnected, and
presumed_down. Consumers are by default connected, they can become
disconnected if the coordinator receives a down event with a
noconnection reason, meaning the node of the consumer has been
disconnected from the other nodes. Consumers can become connected again when
their node joins the other nodes again.
Disconnected consumers are still considered part of a group, as they are
expected to come back at some point. For example there is no rebalancing
in a group if the active consumer got disconnected.
The coordinator sets a timer when a disconnection occurs. When the timer
expires, corresponding disconnected consumers pass into the "presumed
down" state. At this point they are no longer considered part of their
respective group and are excluded from rebalancing decision. They are expected
to get removed from the group by the appropriate down event of a
monitor.
So the consumer status is now a tuple, e.g. {connected, active}. Note
this is an implementation detail: only the stream SAC coordinator deals with
the status of stream SAC consumers.
2 new configuration entries are introduced:
* rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the
disconnected-to-forgotten timer.
* rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands
in the coordinator. It used to be a fixed value of 30 seconds. The
default value is still the same. The setting has been introduced to
make integration tests faster.
Fixes #14070
2025-06-10 18:01:18 +08:00
|
|
|
?assertEqual([[{subscription_id, 0}, {state, "active (connected)"}],
|
|
|
|
[{subscription_id, 1}, {state, "waiting (connected)"}],
|
|
|
|
[{subscription_id, 2}, {state, "waiting (connected)"}]],
|
2022-01-28 17:55:57 +08:00
|
|
|
Consumers1),
|
|
|
|
|
|
|
|
Stream2 = <<"list_group_consumers_run_2">>,
|
|
|
|
OptsGroup2 =
|
|
|
|
maps:merge(#{stream => Stream2, reference => ConsumerReference},
|
|
|
|
Opts),
|
|
|
|
|
|
|
|
create_stream(S, Stream2, C),
|
|
|
|
subscribe(S, 3, Stream2, SubProperties, C),
|
|
|
|
handle_consumer_update(S, C, 3),
|
|
|
|
subscribe(S, 4, Stream2, SubProperties, C),
|
|
|
|
subscribe(S, 5, Stream2, SubProperties, C),
|
|
|
|
|
|
|
|
?awaitMatch(3 + 3, consumer_count(Config), ?WAIT),
|
|
|
|
|
|
|
|
{ok, Consumers2} =
|
|
|
|
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup2),
|
Prevent blocked groups in stream SAC with fine-grained status
A boolean status in the stream SAC coordinator is not enough to follow
the evolution of a consumer. For example a former active consumer that
is stepping down can go down before another consumer in the group is
activated, letting the coordinator expect an activation request that
will never arrive, leaving the group without any active consumer.
This commit introduces 3 status: active (formerly "true"), waiting
(formerly "false"), and deactivating. The coordinator will now know when
a deactivating consumer goes down and will trigger a rebalancing to
avoid a stuck group.
This commit also introduces a status related to the connectivity state
of a consumer. The possible values are: connected, disconnected, and
presumed_down. Consumers are by default connected, they can become
disconnected if the coordinator receives a down event with a
noconnection reason, meaning the node of the consumer has been
disconnected from the other nodes. Consumers can become connected again when
their node joins the other nodes again.
Disconnected consumers are still considered part of a group, as they are
expected to come back at some point. For example there is no rebalancing
in a group if the active consumer got disconnected.
The coordinator sets a timer when a disconnection occurs. When the timer
expires, corresponding disconnected consumers pass into the "presumed
down" state. At this point they are no longer considered part of their
respective group and are excluded from rebalancing decision. They are expected
to get removed from the group by the appropriate down event of a
monitor.
So the consumer status is now a tuple, e.g. {connected, active}. Note
this is an implementation detail: only the stream SAC coordinator deals with
the status of stream SAC consumers.
2 new configuration entries are introduced:
* rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the
disconnected-to-forgotten timer.
* rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands
in the coordinator. It used to be a fixed value of 30 seconds. The
default value is still the same. The setting has been introduced to
make integration tests faster.
Fixes #14070
2025-06-10 18:01:18 +08:00
|
|
|
?assertEqual([[{subscription_id, 3}, {state, "active (connected)"}],
|
|
|
|
[{subscription_id, 4}, {state, "waiting (connected)"}],
|
|
|
|
[{subscription_id, 5}, {state, "waiting (connected)"}]],
|
2022-01-28 17:55:57 +08:00
|
|
|
Consumers2),
|
|
|
|
|
|
|
|
delete_stream(S, Stream1, C),
|
|
|
|
delete_stream(S, Stream2, C),
|
|
|
|
|
|
|
|
{error, not_found} =
|
|
|
|
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup2),
|
|
|
|
|
|
|
|
close(S, C),
|
|
|
|
ok.
|
|
|
|
|
2025-06-10 22:51:08 +08:00
|
|
|
activate_consumer_validate(_) ->
|
|
|
|
Cmd = ?COMMAND_ACTIVATE_STREAM_CONSUMER,
|
|
|
|
ValidOpts = #{vhost => <<"/">>,
|
|
|
|
stream => <<"s1">>,
|
|
|
|
reference => <<"foo">>},
|
|
|
|
?assertMatch({validation_failure, not_enough_args},
|
|
|
|
Cmd:validate([], #{})),
|
|
|
|
?assertMatch({validation_failure, not_enough_args},
|
|
|
|
Cmd:validate([], #{vhost => <<"test">>})),
|
|
|
|
?assertMatch({validation_failure, too_many_args},
|
|
|
|
Cmd:validate([<<"foo">>], ValidOpts)),
|
|
|
|
?assertMatch(ok, Cmd:validate([], ValidOpts)).
|
|
|
|
|
|
|
|
activate_consumer_merge_defaults(_Config) ->
|
|
|
|
Cmd = ?COMMAND_ACTIVATE_STREAM_CONSUMER,
|
|
|
|
Opts = #{vhost => <<"/">>,
|
|
|
|
stream => <<"s1">>,
|
|
|
|
reference => <<"foo">>},
|
|
|
|
?assertEqual({[], Opts},
|
|
|
|
Cmd:merge_defaults([], maps:without([vhost], Opts))),
|
|
|
|
Merged = maps:merge(Opts, #{vhost => "vhost"}),
|
|
|
|
?assertEqual({[], Merged},
|
|
|
|
Cmd:merge_defaults([], Merged)).
|
|
|
|
|
|
|
|
activate_consumer_run(Config) ->
|
|
|
|
Cmd = ?COMMAND_ACTIVATE_STREAM_CONSUMER,
|
|
|
|
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
|
|
Opts =#{node => Node,
|
|
|
|
timeout => 10000,
|
|
|
|
vhost => <<"/">>},
|
|
|
|
Args = [],
|
|
|
|
|
|
|
|
St = atom_to_binary(?FUNCTION_NAME, utf8),
|
|
|
|
ConsumerReference = <<"foo">>,
|
|
|
|
OptsGroup = maps:merge(#{stream => St, reference => ConsumerReference},
|
|
|
|
Opts),
|
|
|
|
|
|
|
|
%% the group does not exist yet
|
|
|
|
?assertEqual({error, not_found}, Cmd:run(Args, OptsGroup)),
|
|
|
|
|
|
|
|
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
|
|
|
|
{S, C} = start_stream_connection(StreamPort),
|
|
|
|
?awaitMatch(1, connection_count(Config), ?WAIT),
|
|
|
|
|
|
|
|
SubProperties =#{<<"single-active-consumer">> => <<"true">>,
|
|
|
|
<<"name">> => ConsumerReference},
|
|
|
|
|
|
|
|
create_stream(S, St, C),
|
|
|
|
subscribe(S, 0, St, SubProperties, C),
|
|
|
|
handle_consumer_update(S, C, 0),
|
|
|
|
subscribe(S, 1, St, SubProperties, C),
|
|
|
|
subscribe(S, 2, St, SubProperties, C),
|
|
|
|
|
|
|
|
?awaitMatch(3, consumer_count(Config), ?WAIT),
|
|
|
|
|
|
|
|
?assertEqual(ok, Cmd:run(Args, OptsGroup)),
|
|
|
|
|
|
|
|
delete_stream(S, St, C),
|
|
|
|
close(S, C),
|
|
|
|
ok.
|
|
|
|
|
2022-01-27 16:39:35 +08:00
|
|
|
handle_consumer_update(S, C0, SubId) ->
|
|
|
|
{{request, CorrId, {consumer_update, SubId, true}}, C1} =
|
|
|
|
rabbit_stream_SUITE:receive_commands(gen_tcp, S, C0),
|
|
|
|
ConsumerUpdateCmd =
|
|
|
|
{response, CorrId, {consumer_update, ?RESPONSE_CODE_OK, next}},
|
|
|
|
ConsumerUpdateFrame = rabbit_stream_core:frame(ConsumerUpdateCmd),
|
|
|
|
ok = gen_tcp:send(S, ConsumerUpdateFrame),
|
|
|
|
C1.
|
|
|
|
|
|
|
|
assertConsumerGroup(S, R, PI, Cs, Record) ->
|
|
|
|
?assertEqual(S, proplists:get_value(stream, Record)),
|
|
|
|
?assertEqual(R, proplists:get_value(reference, Record)),
|
|
|
|
?assertEqual(PI, proplists:get_value(partition_index, Record)),
|
|
|
|
?assertEqual(Cs, proplists:get_value(consumers, Record)),
|
|
|
|
ok.
|
|
|
|
|
2023-10-05 22:01:18 +08:00
|
|
|
list_stream_tracking_validate(_) ->
|
|
|
|
ValidOpts = #{vhost => <<"/">>, <<"writer">> => true},
|
|
|
|
?assertMatch({validation_failure, not_enough_args},
|
|
|
|
?COMMAND_LIST_STREAM_TRACKING:validate([], #{})),
|
|
|
|
?assertMatch({validation_failure, not_enough_args},
|
|
|
|
?COMMAND_LIST_STREAM_TRACKING:validate([],
|
|
|
|
#{vhost =>
|
|
|
|
<<"test">>})),
|
|
|
|
?assertMatch({validation_failure, "Specify only one of --all, --offset, --writer."},
|
|
|
|
?COMMAND_LIST_STREAM_TRACKING:validate([<<"stream">>],
|
|
|
|
#{all => true, writer => true})),
|
|
|
|
?assertMatch({validation_failure, too_many_args},
|
|
|
|
?COMMAND_LIST_STREAM_TRACKING:validate([<<"stream">>, <<"bad">>],
|
|
|
|
ValidOpts)),
|
|
|
|
|
|
|
|
?assertMatch(ok,
|
|
|
|
?COMMAND_LIST_STREAM_TRACKING:validate([<<"stream">>],
|
|
|
|
ValidOpts)).
|
|
|
|
list_stream_tracking_merge_defaults(_Config) ->
|
|
|
|
?assertMatch({[<<"s">>], #{all := true, vhost := <<"/">>}},
|
|
|
|
?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{})),
|
|
|
|
|
|
|
|
?assertMatch({[<<"s">>], #{writer := true, vhost := <<"/">>}},
|
|
|
|
?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{writer => true})),
|
|
|
|
|
|
|
|
?assertMatch({[<<"s">>], #{all := true, vhost := <<"dev">>}},
|
|
|
|
?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{vhost => <<"dev">>})),
|
|
|
|
|
|
|
|
?assertMatch({[<<"s">>], #{writer := true, vhost := <<"dev">>}},
|
|
|
|
?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{writer => true, vhost => <<"dev">>})).
|
|
|
|
|
|
|
|
list_stream_tracking_run(Config) ->
|
|
|
|
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
|
|
Stream = <<"list_stream_tracking_run">>,
|
|
|
|
ConsumerReference = <<"foo">>,
|
|
|
|
PublisherReference = <<"bar">>,
|
|
|
|
Opts =
|
|
|
|
#{node => Node,
|
|
|
|
timeout => 10000,
|
|
|
|
vhost => <<"/">>},
|
|
|
|
Args = [Stream],
|
|
|
|
|
|
|
|
%% the stream does not exist yet
|
|
|
|
?assertMatch({error, "The stream does not exist."},
|
|
|
|
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})),
|
|
|
|
|
|
|
|
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
|
|
|
|
{S, C} = start_stream_connection(StreamPort),
|
|
|
|
?awaitMatch(1, connection_count(Config), ?WAIT),
|
|
|
|
|
|
|
|
create_stream(S, Stream, C),
|
|
|
|
|
|
|
|
?assertMatch([],
|
|
|
|
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})),
|
|
|
|
|
|
|
|
store_offset(S, Stream, ConsumerReference, 42, C),
|
|
|
|
|
2023-10-05 22:49:40 +08:00
|
|
|
?assertMatch([[{type,offset}, {name, ConsumerReference}, {tracking_value, 42}]],
|
2023-10-05 22:01:18 +08:00
|
|
|
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})),
|
|
|
|
|
2023-10-05 22:49:40 +08:00
|
|
|
?assertMatch([[{type,offset}, {name, ConsumerReference}, {tracking_value, 42}]],
|
2023-10-05 22:01:18 +08:00
|
|
|
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{offset => true})),
|
|
|
|
|
|
|
|
ok = store_offset(S, Stream, ConsumerReference, 55, C),
|
2023-10-05 22:49:40 +08:00
|
|
|
?assertMatch([[{type,offset}, {name, ConsumerReference}, {tracking_value, 55}]],
|
2023-10-05 22:01:18 +08:00
|
|
|
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{offset => true})),
|
|
|
|
|
|
|
|
|
|
|
|
PublisherId = 1,
|
|
|
|
rabbit_stream_SUITE:test_declare_publisher(gen_tcp, S, PublisherId,
|
|
|
|
PublisherReference, Stream, C),
|
|
|
|
rabbit_stream_SUITE:test_publish_confirm(gen_tcp, S, PublisherId, 42, <<"">>, C),
|
|
|
|
|
|
|
|
ok = check_publisher_sequence(S, Stream, PublisherReference, 42, C),
|
|
|
|
|
|
|
|
?assertMatch([
|
2023-10-05 22:49:40 +08:00
|
|
|
[{type,writer},{name,<<"bar">>},{tracking_value, 42}],
|
|
|
|
[{type,offset},{name,<<"foo">>},{tracking_value, 55}]
|
2023-10-05 22:01:18 +08:00
|
|
|
],
|
|
|
|
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})),
|
|
|
|
|
|
|
|
?assertMatch([
|
2023-10-05 22:49:40 +08:00
|
|
|
[{type,writer},{name,<<"bar">>},{tracking_value, 42}]
|
2023-10-05 22:01:18 +08:00
|
|
|
],
|
|
|
|
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{writer => true})),
|
|
|
|
|
|
|
|
rabbit_stream_SUITE:test_publish_confirm(gen_tcp, S, PublisherId, 66, <<"">>, C),
|
|
|
|
|
|
|
|
ok = check_publisher_sequence(S, Stream, PublisherReference, 66, C),
|
|
|
|
|
|
|
|
?assertMatch([
|
2023-10-05 22:49:40 +08:00
|
|
|
[{type,writer},{name,<<"bar">>},{tracking_value, 66}]
|
2023-10-05 22:01:18 +08:00
|
|
|
],
|
|
|
|
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{writer => true})),
|
|
|
|
|
|
|
|
delete_stream(S, Stream, C),
|
|
|
|
|
|
|
|
close(S, C),
|
|
|
|
ok.
|
|
|
|
|
2025-07-01 17:16:48 +08:00
|
|
|
reset_offset_validate(_) ->
|
|
|
|
Cmd = ?COMMAND_RESET_OFFSET,
|
|
|
|
ValidOpts = #{vhost => <<"/">>,
|
|
|
|
stream => <<"s1">>,
|
|
|
|
reference => <<"foo">>},
|
|
|
|
?assertMatch({validation_failure, not_enough_args},
|
|
|
|
Cmd:validate([], #{})),
|
|
|
|
?assertMatch({validation_failure, not_enough_args},
|
|
|
|
Cmd:validate([], #{vhost => <<"test">>})),
|
|
|
|
?assertMatch({validation_failure, too_many_args},
|
|
|
|
Cmd:validate([<<"foo">>], ValidOpts)),
|
|
|
|
?assertMatch({validation_failure, reference_too_long},
|
|
|
|
Cmd:validate([], ValidOpts#{reference => gen_bin(256)})),
|
|
|
|
?assertMatch(ok, Cmd:validate([], ValidOpts)),
|
|
|
|
?assertMatch(ok, Cmd:validate([], ValidOpts#{reference => gen_bin(255)})).
|
|
|
|
|
|
|
|
reset_offset_merge_defaults(_Config) ->
|
|
|
|
Cmd = ?COMMAND_RESET_OFFSET,
|
|
|
|
Opts = #{vhost => <<"/">>,
|
|
|
|
stream => <<"s1">>,
|
|
|
|
reference => <<"foo">>},
|
|
|
|
?assertEqual({[], Opts},
|
|
|
|
Cmd:merge_defaults([], maps:without([vhost], Opts))),
|
|
|
|
Merged = maps:merge(Opts, #{vhost => "vhost"}),
|
|
|
|
?assertEqual({[], Merged},
|
|
|
|
Cmd:merge_defaults([], Merged)).
|
|
|
|
|
|
|
|
reset_offset_run(Config) ->
|
|
|
|
Cmd = ?COMMAND_RESET_OFFSET,
|
|
|
|
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
|
|
Opts =#{node => Node,
|
|
|
|
timeout => 10000,
|
|
|
|
vhost => <<"/">>},
|
|
|
|
Args = [],
|
|
|
|
|
|
|
|
St = atom_to_binary(?FUNCTION_NAME, utf8),
|
|
|
|
Ref = <<"foo">>,
|
|
|
|
OptsGroup = maps:merge(#{stream => St, reference => Ref},
|
|
|
|
Opts),
|
|
|
|
|
|
|
|
%% the stream does not exist yet
|
|
|
|
?assertMatch({error, not_found},
|
|
|
|
Cmd:run(Args, OptsGroup)),
|
|
|
|
|
|
|
|
Port = rabbit_stream_SUITE:get_stream_port(Config),
|
|
|
|
{S, C} = start_stream_connection(Port),
|
|
|
|
create_stream(S, St, C),
|
|
|
|
|
|
|
|
?assertEqual({error, no_reference}, Cmd:run(Args, OptsGroup)),
|
|
|
|
store_offset(S, St, Ref, 42, C),
|
|
|
|
|
|
|
|
check_stored_offset(S, St, Ref, 42, C),
|
|
|
|
?assertMatch(ok, Cmd:run(Args, OptsGroup)),
|
|
|
|
check_stored_offset(S, St, Ref, 0, C),
|
|
|
|
|
|
|
|
delete_stream(S, St, C),
|
|
|
|
close(S, C),
|
|
|
|
ok.
|
|
|
|
|
2021-09-24 21:18:39 +08:00
|
|
|
add_super_stream_merge_defaults(_Config) ->
|
|
|
|
?assertMatch({[<<"super-stream">>],
|
|
|
|
#{partitions := 3, vhost := <<"/">>}},
|
|
|
|
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
|
|
|
|
#{})),
|
|
|
|
|
|
|
|
?assertMatch({[<<"super-stream">>],
|
|
|
|
#{partitions := 5, vhost := <<"/">>}},
|
|
|
|
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
|
|
|
|
#{partitions => 5})),
|
2023-11-15 21:00:07 +08:00
|
|
|
DefaultWithBindingKeys =
|
|
|
|
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
|
|
|
|
#{binding_keys =>
|
|
|
|
<<"amer,emea,apac">>}),
|
|
|
|
?assertMatch({[<<"super-stream">>],
|
|
|
|
#{binding_keys := <<"amer,emea,apac">>, vhost := <<"/">>}},
|
|
|
|
DefaultWithBindingKeys),
|
|
|
|
|
|
|
|
{_, OptsBks} = DefaultWithBindingKeys,
|
|
|
|
?assertEqual(false, maps:is_key(partitions, OptsBks)),
|
2021-09-24 21:18:39 +08:00
|
|
|
|
|
|
|
DefaultWithRoutingKeys =
|
|
|
|
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
|
|
|
|
#{routing_keys =>
|
|
|
|
<<"amer,emea,apac">>}),
|
|
|
|
?assertMatch({[<<"super-stream">>],
|
2023-11-15 21:00:07 +08:00
|
|
|
#{binding_keys := <<"amer,emea,apac">>, vhost := <<"/">>}},
|
2021-09-24 21:18:39 +08:00
|
|
|
DefaultWithRoutingKeys),
|
|
|
|
|
2023-11-15 21:00:07 +08:00
|
|
|
{_, OptsRks} = DefaultWithRoutingKeys,
|
|
|
|
?assertEqual(false, maps:is_key(partitions, OptsRks)).
|
2021-09-24 21:18:39 +08:00
|
|
|
|
|
|
|
add_super_stream_validate(_Config) ->
|
|
|
|
?assertMatch({validation_failure, not_enough_args},
|
|
|
|
?COMMAND_ADD_SUPER_STREAM:validate([], #{})),
|
|
|
|
?assertMatch({validation_failure, too_many_args},
|
|
|
|
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>, <<"b">>], #{})),
|
|
|
|
?assertMatch({validation_failure, _},
|
|
|
|
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
|
|
|
|
#{partitions => 1,
|
|
|
|
routing_keys =>
|
|
|
|
<<"a,b,c">>})),
|
2023-11-15 21:00:07 +08:00
|
|
|
?assertMatch({validation_failure, _},
|
|
|
|
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
|
|
|
|
#{partitions => 1,
|
|
|
|
binding_keys => <<"a,b,c">>})),
|
|
|
|
|
|
|
|
?assertMatch({validation_failure, _},
|
|
|
|
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
|
|
|
|
#{routing_keys => 1,
|
|
|
|
binding_keys => <<"a,b,c">>}
|
|
|
|
)),
|
|
|
|
|
2021-09-24 21:18:39 +08:00
|
|
|
?assertMatch({validation_failure, _},
|
|
|
|
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
|
|
|
|
#{partitions => 0})),
|
|
|
|
?assertEqual(ok,
|
|
|
|
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
|
|
|
|
#{partitions => 5})),
|
|
|
|
?assertEqual(ok,
|
|
|
|
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
|
|
|
|
#{routing_keys =>
|
|
|
|
<<"a,b,c">>})),
|
2023-11-15 21:00:07 +08:00
|
|
|
?assertEqual(ok,
|
|
|
|
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
|
|
|
|
#{binding_keys =>
|
|
|
|
<<"a,b,c">>})),
|
2021-09-30 16:43:29 +08:00
|
|
|
|
|
|
|
[case Expected of
|
|
|
|
ok ->
|
|
|
|
?assertEqual(ok,
|
|
|
|
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], Opts));
|
|
|
|
error ->
|
|
|
|
?assertMatch({validation_failure, _},
|
|
|
|
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], Opts))
|
|
|
|
end
|
|
|
|
|| {Opts, Expected}
|
|
|
|
<- [{#{max_length_bytes => 1000}, ok},
|
|
|
|
{#{max_length_bytes => <<"1000">>}, ok},
|
|
|
|
{#{max_length_bytes => <<"100gb">>}, ok},
|
|
|
|
{#{max_length_bytes => <<"50mb">>}, ok},
|
|
|
|
{#{max_length_bytes => <<"50bm">>}, error},
|
|
|
|
{#{max_age => <<"PT10M">>}, ok},
|
|
|
|
{#{max_age => <<"P5DT8H">>}, ok},
|
|
|
|
{#{max_age => <<"foo">>}, error},
|
|
|
|
{#{stream_max_segment_size_bytes => 1000}, ok},
|
|
|
|
{#{stream_max_segment_size_bytes => <<"1000">>}, ok},
|
|
|
|
{#{stream_max_segment_size_bytes => <<"100gb">>}, ok},
|
|
|
|
{#{stream_max_segment_size_bytes => <<"50mb">>}, ok},
|
|
|
|
{#{stream_max_segment_size_bytes => <<"50bm">>}, error},
|
|
|
|
{#{leader_locator => <<"client-local">>}, ok},
|
|
|
|
{#{leader_locator => <<"least-leaders">>}, ok},
|
|
|
|
{#{leader_locator => <<"random">>}, ok},
|
|
|
|
{#{leader_locator => <<"foo">>}, error},
|
|
|
|
{#{initial_cluster_size => <<"1">>}, ok},
|
|
|
|
{#{initial_cluster_size => <<"2">>}, ok},
|
|
|
|
{#{initial_cluster_size => <<"3">>}, ok},
|
|
|
|
{#{initial_cluster_size => <<"0">>}, error},
|
|
|
|
{#{initial_cluster_size => <<"-1">>}, error},
|
|
|
|
{#{initial_cluster_size => <<"foo">>}, error}]],
|
2021-09-24 21:18:39 +08:00
|
|
|
ok.
|
|
|
|
|
2021-09-24 22:27:47 +08:00
|
|
|
delete_super_stream_merge_defaults(_Config) ->
|
|
|
|
?assertMatch({[<<"super-stream">>], #{vhost := <<"/">>}},
|
2023-11-07 23:56:18 +08:00
|
|
|
?COMMAND_DELETE_SUPER_STREAM_CLI:merge_defaults([<<"super-stream">>],
|
2021-09-24 22:27:47 +08:00
|
|
|
#{})),
|
|
|
|
ok.
|
|
|
|
|
|
|
|
delete_super_stream_validate(_Config) ->
|
|
|
|
?assertMatch({validation_failure, not_enough_args},
|
2023-11-07 23:56:18 +08:00
|
|
|
?COMMAND_DELETE_SUPER_STREAM_CLI:validate([], #{})),
|
2021-09-24 22:27:47 +08:00
|
|
|
?assertMatch({validation_failure, too_many_args},
|
2023-11-07 23:56:18 +08:00
|
|
|
?COMMAND_DELETE_SUPER_STREAM_CLI:validate([<<"a">>, <<"b">>],
|
2021-09-24 22:27:47 +08:00
|
|
|
#{})),
|
|
|
|
?assertEqual(ok, ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], #{})),
|
|
|
|
ok.
|
|
|
|
|
|
|
|
add_delete_super_stream_run(Config) ->
|
2021-09-24 21:18:39 +08:00
|
|
|
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
|
|
Opts =
|
|
|
|
#{node => Node,
|
|
|
|
timeout => 10000,
|
2021-09-24 22:27:47 +08:00
|
|
|
vhost => <<"/">>},
|
2021-09-24 21:18:39 +08:00
|
|
|
|
2021-09-30 16:43:29 +08:00
|
|
|
% with number of partitions
|
2021-09-24 21:18:39 +08:00
|
|
|
?assertMatch({ok, _},
|
2021-09-24 22:27:47 +08:00
|
|
|
?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
|
|
|
|
maps:merge(#{partitions => 3},
|
|
|
|
Opts))),
|
2021-09-24 21:18:39 +08:00
|
|
|
?assertEqual({ok,
|
|
|
|
[<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]},
|
2021-09-24 22:27:47 +08:00
|
|
|
partitions(Config, <<"invoices">>)),
|
|
|
|
?assertMatch({ok, _},
|
2023-11-07 23:56:18 +08:00
|
|
|
?COMMAND_DELETE_SUPER_STREAM_CLI:run([<<"invoices">>], Opts)),
|
2021-09-24 22:27:47 +08:00
|
|
|
?assertEqual({error, stream_not_found},
|
|
|
|
partitions(Config, <<"invoices">>)),
|
|
|
|
|
2023-11-15 21:00:07 +08:00
|
|
|
% with binding keys
|
2021-09-24 22:27:47 +08:00
|
|
|
?assertMatch({ok, _},
|
|
|
|
?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
|
2023-11-15 21:00:07 +08:00
|
|
|
maps:merge(#{binding_keys => <<" amer,emea , apac">>},
|
2021-09-24 22:27:47 +08:00
|
|
|
Opts))),
|
|
|
|
?assertEqual({ok,
|
|
|
|
[<<"invoices-amer">>, <<"invoices-emea">>,
|
|
|
|
<<"invoices-apac">>]},
|
|
|
|
partitions(Config, <<"invoices">>)),
|
|
|
|
?assertMatch({ok, _},
|
2023-11-07 23:56:18 +08:00
|
|
|
?COMMAND_DELETE_SUPER_STREAM_CLI:run([<<"invoices">>], Opts)),
|
2021-09-24 22:27:47 +08:00
|
|
|
?assertEqual({error, stream_not_found},
|
|
|
|
partitions(Config, <<"invoices">>)),
|
|
|
|
|
2021-09-30 16:43:29 +08:00
|
|
|
% with arguments
|
|
|
|
ExtraOptions =
|
|
|
|
#{partitions => 3,
|
|
|
|
max_length_bytes => <<"50mb">>,
|
|
|
|
max_age => <<"PT10M">>,
|
|
|
|
stream_max_segment_size_bytes => <<"1mb">>,
|
|
|
|
leader_locator => <<"random">>,
|
|
|
|
initial_cluster_size => <<"1">>},
|
|
|
|
|
|
|
|
?assertMatch({ok, _},
|
|
|
|
?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
|
|
|
|
maps:merge(ExtraOptions, Opts))),
|
|
|
|
|
|
|
|
{ok, Q} = queue_lookup(Config, <<"invoices-0">>),
|
|
|
|
Args = amqqueue:get_arguments(Q),
|
|
|
|
?assertMatch({_, <<"random">>},
|
|
|
|
rabbit_misc:table_lookup(Args, <<"x-queue-leader-locator">>)),
|
|
|
|
?assertMatch({_, 1},
|
|
|
|
rabbit_misc:table_lookup(Args, <<"x-initial-cluster-size">>)),
|
|
|
|
?assertMatch({_, 1000000},
|
|
|
|
rabbit_misc:table_lookup(Args,
|
|
|
|
<<"x-stream-max-segment-size-bytes">>)),
|
|
|
|
?assertMatch({_, <<"600s">>},
|
|
|
|
rabbit_misc:table_lookup(Args, <<"x-max-age">>)),
|
|
|
|
?assertMatch({_, 50000000},
|
|
|
|
rabbit_misc:table_lookup(Args, <<"x-max-length-bytes">>)),
|
|
|
|
?assertMatch({_, <<"stream">>},
|
|
|
|
rabbit_misc:table_lookup(Args, <<"x-queue-type">>)),
|
|
|
|
|
|
|
|
?assertMatch({ok, _},
|
2023-11-07 23:56:18 +08:00
|
|
|
?COMMAND_DELETE_SUPER_STREAM_CLI:run([<<"invoices">>], Opts)),
|
2021-09-30 16:43:29 +08:00
|
|
|
|
2021-09-24 22:27:47 +08:00
|
|
|
ok.
|
|
|
|
|
2021-09-25 00:18:55 +08:00
|
|
|
partitions(Config, Name) ->
|
|
|
|
rabbit_ct_broker_helpers:rpc(Config,
|
|
|
|
0,
|
|
|
|
rabbit_stream_manager,
|
|
|
|
partitions,
|
|
|
|
[<<"/">>, Name]).
|
2021-09-24 21:18:39 +08:00
|
|
|
|
2021-05-13 19:58:54 +08:00
|
|
|
create_stream(S, Stream, C0) ->
|
2021-05-22 00:48:55 +08:00
|
|
|
rabbit_stream_SUITE:test_create_stream(gen_tcp, S, Stream, C0).
|
2021-01-19 18:31:39 +08:00
|
|
|
|
2022-01-27 16:39:35 +08:00
|
|
|
subscribe(S, SubId, Stream, SubProperties, C) ->
|
|
|
|
rabbit_stream_SUITE:test_subscribe(gen_tcp,
|
|
|
|
S,
|
|
|
|
SubId,
|
|
|
|
Stream,
|
|
|
|
SubProperties,
|
2023-06-16 22:39:21 +08:00
|
|
|
?RESPONSE_CODE_OK,
|
2022-01-27 16:39:35 +08:00
|
|
|
C).
|
|
|
|
|
2021-05-13 19:58:54 +08:00
|
|
|
subscribe(S, SubId, Stream, C) ->
|
2021-05-22 00:48:55 +08:00
|
|
|
rabbit_stream_SUITE:test_subscribe(gen_tcp, S, SubId, Stream, C).
|
2021-01-19 18:31:39 +08:00
|
|
|
|
2021-05-13 19:58:54 +08:00
|
|
|
declare_publisher(S, PubId, Stream, C) ->
|
2021-05-26 17:08:43 +08:00
|
|
|
rabbit_stream_SUITE:test_declare_publisher(gen_tcp,
|
|
|
|
S,
|
|
|
|
PubId,
|
|
|
|
Stream,
|
|
|
|
C).
|
2021-01-19 21:49:30 +08:00
|
|
|
|
2021-05-13 19:58:54 +08:00
|
|
|
delete_stream(S, Stream, C) ->
|
2021-05-22 00:48:55 +08:00
|
|
|
rabbit_stream_SUITE:test_delete_stream(gen_tcp, S, Stream, C).
|
2021-01-19 18:31:39 +08:00
|
|
|
|
2023-10-05 22:01:18 +08:00
|
|
|
delete_stream_no_metadata_update(S, Stream, C) ->
|
|
|
|
rabbit_stream_SUITE:test_delete_stream(gen_tcp, S, Stream, C, false).
|
|
|
|
|
2021-05-13 19:58:54 +08:00
|
|
|
metadata_update_stream_deleted(S, Stream, C) ->
|
2021-05-26 17:08:43 +08:00
|
|
|
rabbit_stream_SUITE:test_metadata_update_stream_deleted(gen_tcp,
|
|
|
|
S,
|
|
|
|
Stream,
|
|
|
|
C).
|
2021-01-19 18:31:39 +08:00
|
|
|
|
2021-05-13 19:58:54 +08:00
|
|
|
close(S, C) ->
|
2021-05-22 00:48:55 +08:00
|
|
|
rabbit_stream_SUITE:test_close(gen_tcp, S, C).
|
2021-01-19 18:31:39 +08:00
|
|
|
|
|
|
|
options(Config) ->
|
|
|
|
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
|
|
Opts =
|
|
|
|
#{node => Node,
|
|
|
|
timeout => 10000,
|
|
|
|
verbose => false,
|
|
|
|
vhost => <<"/">>}, %% just for list_consumers and list_publishers
|
|
|
|
Opts.
|
|
|
|
|
2022-01-27 16:39:35 +08:00
|
|
|
flatten_command_result([], []) ->
|
|
|
|
[];
|
|
|
|
flatten_command_result([], Acc) ->
|
|
|
|
lists:reverse(Acc);
|
|
|
|
flatten_command_result([[{_K, _V} | _RecordRest] = Record | Rest],
|
|
|
|
Acc) ->
|
|
|
|
flatten_command_result(Rest, [Record | Acc]);
|
|
|
|
flatten_command_result([H | T], Acc) ->
|
|
|
|
Acc1 = flatten_command_result(H, Acc),
|
|
|
|
flatten_command_result(T, Acc1).
|
|
|
|
|
2021-01-19 18:31:39 +08:00
|
|
|
to_list(CommandRun) ->
|
2022-01-27 16:39:35 +08:00
|
|
|
Lists = 'Elixir.Enum':to_list(CommandRun),
|
|
|
|
%% we can get results from different connections, so we flatten out
|
|
|
|
flatten_command_result(Lists, []).
|
2021-01-19 18:31:39 +08:00
|
|
|
|
|
|
|
command_result_count(CommandRun) ->
|
|
|
|
length(to_list(CommandRun)).
|
|
|
|
|
|
|
|
connection_count(Config) ->
|
|
|
|
command_result_count(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>],
|
|
|
|
options(Config))).
|
|
|
|
|
|
|
|
consumer_count(Config) ->
|
|
|
|
command_result_count(?COMMAND_LIST_CONSUMERS:run([<<"stream">>],
|
|
|
|
options(Config))).
|
|
|
|
|
2021-01-19 21:49:30 +08:00
|
|
|
publisher_count(Config) ->
|
|
|
|
command_result_count(?COMMAND_LIST_PUBLISHERS:run([<<"stream">>],
|
|
|
|
options(Config))).
|
|
|
|
|
2021-01-19 18:31:39 +08:00
|
|
|
start_stream_connection(Port) ->
|
2021-05-27 16:43:33 +08:00
|
|
|
start_stream_connection(gen_tcp, Port).
|
|
|
|
|
|
|
|
start_stream_tls_connection(Port) ->
|
|
|
|
start_stream_connection(ssl, Port).
|
|
|
|
|
|
|
|
start_stream_connection(Transport, Port) ->
|
2023-04-24 21:11:44 +08:00
|
|
|
TlsOpts = case Transport of
|
|
|
|
ssl -> [{verify, verify_none}];
|
|
|
|
_ -> []
|
|
|
|
end,
|
2021-01-19 18:31:39 +08:00
|
|
|
{ok, S} =
|
2021-05-27 16:43:33 +08:00
|
|
|
Transport:connect("localhost", Port,
|
2023-04-24 21:11:44 +08:00
|
|
|
[{active, false}, {mode, binary}] ++ TlsOpts),
|
2021-05-13 19:58:54 +08:00
|
|
|
C0 = rabbit_stream_core:init(0),
|
2021-05-27 16:43:33 +08:00
|
|
|
C1 = rabbit_stream_SUITE:test_peer_properties(Transport, S, C0),
|
|
|
|
C = rabbit_stream_SUITE:test_authenticate(Transport, S, C1),
|
2021-05-13 19:58:54 +08:00
|
|
|
{S, C}.
|
2021-01-19 18:31:39 +08:00
|
|
|
|
|
|
|
start_amqp_connection(Type, Node, Port) ->
|
|
|
|
Params = amqp_params(Type, Node, Port),
|
|
|
|
{ok, _Connection} = amqp_connection:start(Params).
|
|
|
|
|
|
|
|
amqp_params(network, _, Port) ->
|
|
|
|
#amqp_params_network{port = Port};
|
|
|
|
amqp_params(direct, Node, _) ->
|
|
|
|
#amqp_params_direct{node = Node}.
|
2021-09-30 16:43:29 +08:00
|
|
|
|
|
|
|
queue_lookup(Config, Q) ->
|
|
|
|
QueueName = rabbit_misc:r(<<"/">>, queue, Q),
|
|
|
|
rabbit_ct_broker_helpers:rpc(Config,
|
|
|
|
0,
|
|
|
|
rabbit_amqqueue,
|
|
|
|
lookup,
|
|
|
|
[QueueName]).
|
2023-10-05 22:01:18 +08:00
|
|
|
|
|
|
|
store_offset(S, Stream, Reference, Value, C) ->
|
|
|
|
StoreOffsetFrame =
|
|
|
|
rabbit_stream_core:frame({store_offset, Reference, Stream, Value}),
|
|
|
|
ok = gen_tcp:send(S, StoreOffsetFrame),
|
|
|
|
case check_stored_offset(S, Stream, Reference, Value, C, 20) of
|
|
|
|
ok ->
|
|
|
|
ok;
|
|
|
|
_ ->
|
|
|
|
{error, offset_not_stored}
|
|
|
|
end.
|
|
|
|
|
2025-07-01 17:16:48 +08:00
|
|
|
|
|
|
|
check_stored_offset(S, Stream, Reference, Expected, C) ->
|
|
|
|
check_stored_offset(S, Stream, Reference, Expected, C, 20).
|
|
|
|
|
2023-10-05 22:01:18 +08:00
|
|
|
check_stored_offset(_, _, _, _, _, 0) ->
|
|
|
|
error;
|
|
|
|
check_stored_offset(S, Stream, Reference, Expected, C, Attempt) ->
|
|
|
|
QueryOffsetFrame =
|
|
|
|
rabbit_stream_core:frame({request, 1, {query_offset, Reference, Stream}}),
|
|
|
|
ok = gen_tcp:send(S, QueryOffsetFrame),
|
|
|
|
{Cmd, _} = rabbit_stream_SUITE:receive_commands(gen_tcp, S, C),
|
|
|
|
?assertMatch({response, 1, {query_offset, ?RESPONSE_CODE_OK, _}}, Cmd),
|
|
|
|
{response, 1, {query_offset, ?RESPONSE_CODE_OK, StoredValue}} = Cmd,
|
|
|
|
case StoredValue of
|
|
|
|
Expected ->
|
|
|
|
ok;
|
|
|
|
_ ->
|
|
|
|
timer:sleep(50),
|
|
|
|
check_stored_offset(S, Stream, Reference, Expected, C, Attempt - 1)
|
|
|
|
end.
|
|
|
|
|
|
|
|
check_publisher_sequence(S, Stream, Reference, Expected, C) ->
|
|
|
|
check_publisher_sequence(S, Stream, Reference, Expected, C, 20).
|
|
|
|
|
|
|
|
check_publisher_sequence(_, _, _, _, _, 0) ->
|
|
|
|
error;
|
|
|
|
check_publisher_sequence(S, Stream, Reference, Expected, C, Attempt) ->
|
|
|
|
QueryFrame =
|
|
|
|
rabbit_stream_core:frame({request, 1, {query_publisher_sequence, Reference, Stream}}),
|
|
|
|
ok = gen_tcp:send(S, QueryFrame),
|
|
|
|
{Cmd, _} = rabbit_stream_SUITE:receive_commands(gen_tcp, S, C),
|
|
|
|
?assertMatch({response, 1, {query_publisher_sequence, _, _}}, Cmd),
|
|
|
|
{response, 1, {query_publisher_sequence, _, StoredValue}} = Cmd,
|
|
|
|
case StoredValue of
|
|
|
|
Expected ->
|
|
|
|
ok;
|
|
|
|
_ ->
|
|
|
|
timer:sleep(50),
|
|
|
|
check_publisher_sequence(S, Stream, Reference, Expected, C, Attempt - 1)
|
|
|
|
end.
|
|
|
|
|
2025-07-01 17:16:48 +08:00
|
|
|
gen_bin(L) ->
|
|
|
|
list_to_binary(lists:duplicate(L, "a")).
|