rabbitmq-server/deps/rabbitmq_stream/test/commands_SUITE.erl

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

996 lines
38 KiB
Erlang
Raw Permalink Normal View History

2021-01-19 18:31:39 +08:00
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
2024-01-23 12:44:47 +08:00
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
2021-01-19 18:31:39 +08:00
%%
-module(commands_SUITE).
2021-05-13 19:58:54 +08:00
-compile(nowarn_export_all).
2021-01-19 18:31:39 +08:00
-compile([export_all]).
2021-05-13 19:58:54 +08:00
% -include_lib("common_test/include/ct.hrl").
2021-01-19 18:31:39 +08:00
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
2021-06-11 22:58:17 +08:00
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
2021-01-19 18:31:39 +08:00
-define(WAIT, 5000).
2021-01-19 18:31:39 +08:00
-define(COMMAND_LIST_CONNECTIONS,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand').
-define(COMMAND_LIST_CONSUMERS,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand').
2021-01-19 21:49:30 +08:00
-define(COMMAND_LIST_PUBLISHERS,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand').
2021-09-24 21:18:39 +08:00
-define(COMMAND_ADD_SUPER_STREAM,
'Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand').
-define(COMMAND_DELETE_SUPER_STREAM_CLI,
2021-09-24 22:27:47 +08:00
'Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand').
-define(COMMAND_LIST_CONSUMER_GROUPS,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumerGroupsCommand').
-define(COMMAND_LIST_GROUP_CONSUMERS,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand').
2023-10-05 22:01:18 +08:00
-define(COMMAND_LIST_STREAM_TRACKING,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamTrackingCommand').
2021-01-19 18:31:39 +08:00
all() ->
2021-09-24 21:18:39 +08:00
[{group, list_connections},
{group, list_consumers},
{group, list_publishers},
{group, list_consumer_groups},
{group, list_group_consumers},
2023-10-05 22:01:18 +08:00
{group, list_stream_tracking},
2021-09-25 00:05:30 +08:00
{group, super_streams}].
2021-01-19 18:31:39 +08:00
groups() ->
[{list_connections, [],
[list_connections_merge_defaults, list_connections_run,
list_tls_connections_run]},
2021-01-19 18:31:39 +08:00
{list_consumers, [],
2021-01-19 21:49:30 +08:00
[list_consumers_merge_defaults, list_consumers_run]},
{list_publishers, [],
2021-09-24 21:18:39 +08:00
[list_publishers_merge_defaults, list_publishers_run]},
{list_consumer_groups, [],
[list_consumer_groups_validate, list_consumer_groups_merge_defaults,
list_consumer_groups_run]},
{list_group_consumers, [],
[list_group_consumers_validate, list_group_consumers_merge_defaults,
list_group_consumers_run]},
2023-10-05 22:01:18 +08:00
{list_stream_tracking, [],
[list_stream_tracking_validate, list_stream_tracking_merge_defaults,
list_stream_tracking_run]},
2021-09-24 21:18:39 +08:00
{super_streams, [],
2021-09-24 22:27:47 +08:00
[add_super_stream_merge_defaults,
add_super_stream_validate,
delete_super_stream_merge_defaults,
delete_super_stream_validate,
add_delete_super_stream_run]}].
2021-01-19 18:31:39 +08:00
init_per_suite(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip,
"mixed version clusters are not supported for "
"this suite"};
_ ->
Config1 =
rabbit_ct_helpers:set_config(Config,
[{rmq_nodename_suffix, ?MODULE}]),
Config2 =
rabbit_ct_helpers:set_config(Config1,
2021-09-09 16:37:41 +08:00
{rabbitmq_ct_tls_verify,
verify_none}),
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config2,
rabbit_ct_broker_helpers:setup_steps())
end.
2021-01-19 18:31:39 +08:00
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).
init_per_group(_, Config) ->
Config.
end_per_group(_, Config) ->
Config.
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
list_connections_merge_defaults(_Config) ->
{[<<"node">>, <<"conn_name">>], #{verbose := false}} =
2021-01-19 18:31:39 +08:00
?COMMAND_LIST_CONNECTIONS:merge_defaults([], #{}),
{[<<"other_key">>], #{verbose := true}} =
?COMMAND_LIST_CONNECTIONS:merge_defaults([<<"other_key">>],
#{verbose => true}),
{[<<"other_key">>], #{verbose := false}} =
?COMMAND_LIST_CONNECTIONS:merge_defaults([<<"other_key">>],
#{verbose => false}).
list_connections_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
verbose => false},
%% No connections
[] = to_list(?COMMAND_LIST_CONNECTIONS:run([], Opts)),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
2021-05-13 19:58:54 +08:00
{S1, C1} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
2021-01-19 18:31:39 +08:00
[[{conn_name, _}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
[[{ssl, false}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"ssl">>], Opts)),
2021-01-19 18:31:39 +08:00
2021-05-13 19:58:54 +08:00
{S2, C2} = start_stream_connection(StreamPort),
?awaitMatch(2, connection_count(Config), ?WAIT),
2021-01-19 18:31:39 +08:00
[[{conn_name, _}], [{conn_name, _}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
Port =
rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
start_amqp_connection(network, Node, Port),
%% There are still just two connections
[[{conn_name, _}], [{conn_name, _}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
start_amqp_connection(direct, Node, Port),
%% Still two stream connections, one direct AMQP 0-9-1 connection
[[{conn_name, _}], [{conn_name, _}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
%% Verbose returns all keys
Infos =
lists:map(fun(El) -> atom_to_binary(El, utf8) end, ?INFO_ITEMS),
AllKeys = to_list(?COMMAND_LIST_CONNECTIONS:run(Infos, Opts)),
Verbose =
to_list(?COMMAND_LIST_CONNECTIONS:run([], Opts#{verbose => true})),
?assertEqual(AllKeys, Verbose),
%% There are two connections
[First, _Second] = AllKeys,
%% Keys are INFO_ITEMS
?assertEqual(length(?INFO_ITEMS), length(First)),
{Keys, _} = lists:unzip(First),
?assertEqual([], Keys -- ?INFO_ITEMS),
?assertEqual([], ?INFO_ITEMS -- Keys),
rabbit_stream_SUITE:test_close(gen_tcp, S1, C1),
rabbit_stream_SUITE:test_close(gen_tcp, S2, C2),
2021-01-19 18:31:39 +08:00
ok.
list_tls_connections_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
verbose => false},
%% No connections
[] = to_list(?COMMAND_LIST_CONNECTIONS:run([], Opts)),
StreamTlsPort = rabbit_stream_SUITE:get_stream_port_tls(Config),
application:ensure_all_started(ssl),
{S1, C1} = start_stream_tls_connection(StreamTlsPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
[[{conn_name, _}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
[[{ssl, true}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"ssl">>], Opts)),
rabbit_stream_SUITE:test_close(ssl, S1, C1),
ok.
2021-01-19 18:31:39 +08:00
list_consumers_merge_defaults(_Config) ->
DefaultItems =
[rabbit_data_coercion:to_binary(Item)
|| Item <- ?CONSUMER_INFO_ITEMS -- [connection_pid, node]],
2021-01-19 18:31:39 +08:00
{DefaultItems, #{verbose := false}} =
?COMMAND_LIST_CONSUMERS:merge_defaults([], #{}),
{[<<"other_key">>], #{verbose := true}} =
?COMMAND_LIST_CONSUMERS:merge_defaults([<<"other_key">>],
#{verbose => true}),
{[<<"other_key">>], #{verbose := false}} =
?COMMAND_LIST_CONSUMERS:merge_defaults([<<"other_key">>],
#{verbose => false}).
list_consumers_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
verbose => false,
vhost => <<"/">>},
%% No connections, no consumers
[] = to_list(?COMMAND_LIST_CONSUMERS:run([], Opts)),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
2021-05-13 19:58:54 +08:00
{S1, C1} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
2021-01-19 18:31:39 +08:00
Stream = <<"list_consumers_run">>,
2021-05-13 19:58:54 +08:00
C1_1 = create_stream(S1, Stream, C1),
2021-01-19 18:31:39 +08:00
SubId = 42,
2021-05-13 19:58:54 +08:00
C1_2 = subscribe(S1, SubId, Stream, C1_1),
2021-01-19 18:31:39 +08:00
?awaitMatch(1, consumer_count(Config), ?WAIT),
2021-01-19 18:31:39 +08:00
2021-05-13 19:58:54 +08:00
{S2, C2} = start_stream_connection(StreamPort),
?awaitMatch(2, connection_count(Config), ?WAIT),
2021-05-13 19:58:54 +08:00
C2_1 = subscribe(S2, SubId, Stream, C2),
2021-01-19 18:31:39 +08:00
?awaitMatch(2, consumer_count(Config), ?WAIT),
2021-01-19 18:31:39 +08:00
%% Verbose returns all keys
InfoItems = ?CONSUMER_INFO_ITEMS,
Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, InfoItems),
AllKeys = to_list(?COMMAND_LIST_CONSUMERS:run(Infos, Opts)),
Verbose =
to_list(?COMMAND_LIST_CONSUMERS:run([], Opts#{verbose => true})),
?assertEqual(AllKeys, Verbose),
%% There are two consumers
[First, _Second] = AllKeys,
2021-01-19 18:31:39 +08:00
%% Keys are info items
?assertEqual(length(InfoItems), length(First)),
{Keys, _} = lists:unzip(First),
?assertEqual([], Keys -- InfoItems),
?assertEqual([], InfoItems -- Keys),
2021-05-13 19:58:54 +08:00
C1_3 = delete_stream(S1, Stream, C1_2),
% metadata_update_stream_deleted(S1, Stream),
metadata_update_stream_deleted(S2, Stream, C2_1),
close(S1, C1_3),
close(S2, C2_1),
?awaitMatch(0, consumer_count(Config), ?WAIT),
2021-01-19 18:31:39 +08:00
ok.
2021-01-19 21:49:30 +08:00
list_publishers_merge_defaults(_Config) ->
DefaultItems =
[rabbit_data_coercion:to_binary(Item)
|| Item <- ?PUBLISHER_INFO_ITEMS -- [connection_pid, node]],
2021-01-19 21:49:30 +08:00
{DefaultItems, #{verbose := false}} =
?COMMAND_LIST_PUBLISHERS:merge_defaults([], #{}),
{[<<"other_key">>], #{verbose := true}} =
?COMMAND_LIST_PUBLISHERS:merge_defaults([<<"other_key">>],
#{verbose => true}),
{[<<"other_key">>], #{verbose := false}} =
?COMMAND_LIST_PUBLISHERS:merge_defaults([<<"other_key">>],
#{verbose => false}).
list_publishers_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
verbose => false,
vhost => <<"/">>},
%% No connections, no publishers
[] = to_list(?COMMAND_LIST_PUBLISHERS:run([], Opts)),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
2021-05-13 19:58:54 +08:00
{S1, C1} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
2021-01-19 21:49:30 +08:00
Stream = <<"list_publishers_run">>,
2021-05-13 19:58:54 +08:00
C1_1 = create_stream(S1, Stream, C1),
2021-01-19 21:49:30 +08:00
PubId = 42,
2021-05-13 19:58:54 +08:00
C1_2 = declare_publisher(S1, PubId, Stream, C1_1),
2021-01-19 21:49:30 +08:00
?awaitMatch(1, publisher_count(Config), ?WAIT),
2021-01-19 21:49:30 +08:00
2021-05-13 19:58:54 +08:00
{S2, C2} = start_stream_connection(StreamPort),
?awaitMatch(2, connection_count(Config), ?WAIT),
2021-05-13 19:58:54 +08:00
C2_1 = declare_publisher(S2, PubId, Stream, C2),
2021-01-19 21:49:30 +08:00
?awaitMatch(2, publisher_count(Config), ?WAIT),
2021-01-19 21:49:30 +08:00
%% Verbose returns all keys
InfoItems = ?PUBLISHER_INFO_ITEMS,
Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, InfoItems),
AllKeys = to_list(?COMMAND_LIST_PUBLISHERS:run(Infos, Opts)),
Verbose =
to_list(?COMMAND_LIST_PUBLISHERS:run([], Opts#{verbose => true})),
?assertEqual(AllKeys, Verbose),
%% There are two publishers
[First, _Second] = AllKeys,
2021-01-19 21:49:30 +08:00
%% Keys are info items
?assertEqual(length(InfoItems), length(First)),
{Keys, _} = lists:unzip(First),
?assertEqual([], Keys -- InfoItems),
?assertEqual([], InfoItems -- Keys),
2021-05-13 19:58:54 +08:00
C1_3 = delete_stream(S1, Stream, C1_2),
% metadata_update_stream_deleted(S1, Stream),
C2_2 = metadata_update_stream_deleted(S2, Stream, C2_1),
close(S1, C1_3),
close(S2, C2_2),
?awaitMatch(0, publisher_count(Config), ?WAIT),
2021-01-19 21:49:30 +08:00
ok.
list_consumer_groups_validate(_) ->
ValidOpts = #{vhost => <<"/">>},
?assertMatch({validation_failure, {bad_info_key, [foo]}},
?COMMAND_LIST_CONSUMER_GROUPS:validate([<<"foo">>],
ValidOpts)),
?assertMatch(ok,
?COMMAND_LIST_CONSUMER_GROUPS:validate([<<"reference">>],
ValidOpts)),
?assertMatch(ok,
?COMMAND_LIST_CONSUMER_GROUPS:validate([], ValidOpts)).
list_consumer_groups_merge_defaults(_Config) ->
DefaultItems =
[rabbit_data_coercion:to_binary(Item)
|| Item <- ?CONSUMER_GROUP_INFO_ITEMS],
{DefaultItems, #{verbose := false, vhost := <<"/">>}} =
?COMMAND_LIST_CONSUMER_GROUPS:merge_defaults([], #{}),
{[<<"other_key">>], #{verbose := true, vhost := <<"/">>}} =
?COMMAND_LIST_CONSUMER_GROUPS:merge_defaults([<<"other_key">>],
#{verbose => true}),
{[<<"other_key">>], #{verbose := false, vhost := <<"/">>}} =
?COMMAND_LIST_CONSUMER_GROUPS:merge_defaults([<<"other_key">>],
#{verbose => false}).
list_consumer_groups_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
vhost => <<"/">>,
verbose => true},
%% No connections, no consumers
{ok, []} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
{S, C} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
ConsumerReference = <<"foo">>,
SubProperties =
#{<<"single-active-consumer">> => <<"true">>,
<<"name">> => ConsumerReference},
Stream1 = <<"list_consumer_groups_run_1">>,
create_stream(S, Stream1, C),
subscribe(S, 0, Stream1, SubProperties, C),
handle_consumer_update(S, C, 0),
subscribe(S, 1, Stream1, SubProperties, C),
subscribe(S, 2, Stream1, SubProperties, C),
?awaitMatch(3, consumer_count(Config), ?WAIT),
{ok, [CG1]} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
assertConsumerGroup(Stream1, ConsumerReference, -1, 3, CG1),
Stream2 = <<"list_consumer_groups_run_2">>,
create_stream(S, Stream2, C),
subscribe(S, 3, Stream2, SubProperties, C),
handle_consumer_update(S, C, 3),
subscribe(S, 4, Stream2, SubProperties, C),
subscribe(S, 5, Stream2, SubProperties, C),
?awaitMatch(3 + 3, consumer_count(Config), ?WAIT),
{ok, [CG1, CG2]} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
assertConsumerGroup(Stream1, ConsumerReference, -1, 3, CG1),
assertConsumerGroup(Stream2, ConsumerReference, -1, 3, CG2),
delete_stream(S, Stream1, C),
delete_stream(S, Stream2, C),
close(S, C),
{ok, []} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
ok.
list_group_consumers_validate(_) ->
ValidOpts =
#{vhost => <<"/">>,
stream => <<"s1">>,
reference => <<"foo">>},
?assertMatch({validation_failure, not_enough_args},
?COMMAND_LIST_GROUP_CONSUMERS:validate([], #{})),
?assertMatch({validation_failure, not_enough_args},
?COMMAND_LIST_GROUP_CONSUMERS:validate([],
#{vhost =>
<<"test">>})),
?assertMatch({validation_failure, {bad_info_key, [foo]}},
?COMMAND_LIST_GROUP_CONSUMERS:validate([<<"foo">>],
ValidOpts)),
?assertMatch(ok,
?COMMAND_LIST_GROUP_CONSUMERS:validate([<<"subscription_id">>],
ValidOpts)),
?assertMatch(ok,
?COMMAND_LIST_GROUP_CONSUMERS:validate([], ValidOpts)).
list_group_consumers_merge_defaults(_Config) ->
DefaultItems =
[rabbit_data_coercion:to_binary(Item)
|| Item <- ?GROUP_CONSUMER_INFO_ITEMS],
{DefaultItems, #{verbose := false, vhost := <<"/">>}} =
?COMMAND_LIST_GROUP_CONSUMERS:merge_defaults([], #{}),
{[<<"other_key">>], #{verbose := true, vhost := <<"/">>}} =
?COMMAND_LIST_GROUP_CONSUMERS:merge_defaults([<<"other_key">>],
#{verbose => true}),
{[<<"other_key">>], #{verbose := false, vhost := <<"/">>}} =
?COMMAND_LIST_GROUP_CONSUMERS:merge_defaults([<<"other_key">>],
#{verbose => false}).
list_group_consumers_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
vhost => <<"/">>,
verbose => false},
Args = [<<"subscription_id">>, <<"state">>],
Stream1 = <<"list_group_consumers_run_1">>,
ConsumerReference = <<"foo">>,
OptsGroup1 =
maps:merge(#{stream => Stream1, reference => ConsumerReference},
Opts),
%% the group does not exist yet
{error, not_found} =
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup1),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
{S, C} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
SubProperties =
#{<<"single-active-consumer">> => <<"true">>,
<<"name">> => ConsumerReference},
create_stream(S, Stream1, C),
subscribe(S, 0, Stream1, SubProperties, C),
handle_consumer_update(S, C, 0),
subscribe(S, 1, Stream1, SubProperties, C),
subscribe(S, 2, Stream1, SubProperties, C),
?awaitMatch(3, consumer_count(Config), ?WAIT),
{ok, Consumers1} =
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup1),
?assertEqual([[{subscription_id, 0}, {state, active}],
[{subscription_id, 1}, {state, inactive}],
[{subscription_id, 2}, {state, inactive}]],
Consumers1),
Stream2 = <<"list_group_consumers_run_2">>,
OptsGroup2 =
maps:merge(#{stream => Stream2, reference => ConsumerReference},
Opts),
create_stream(S, Stream2, C),
subscribe(S, 3, Stream2, SubProperties, C),
handle_consumer_update(S, C, 3),
subscribe(S, 4, Stream2, SubProperties, C),
subscribe(S, 5, Stream2, SubProperties, C),
?awaitMatch(3 + 3, consumer_count(Config), ?WAIT),
{ok, Consumers2} =
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup2),
?assertEqual([[{subscription_id, 3}, {state, active}],
[{subscription_id, 4}, {state, inactive}],
[{subscription_id, 5}, {state, inactive}]],
Consumers2),
delete_stream(S, Stream1, C),
delete_stream(S, Stream2, C),
{error, not_found} =
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup2),
close(S, C),
ok.
handle_consumer_update(S, C0, SubId) ->
{{request, CorrId, {consumer_update, SubId, true}}, C1} =
rabbit_stream_SUITE:receive_commands(gen_tcp, S, C0),
ConsumerUpdateCmd =
{response, CorrId, {consumer_update, ?RESPONSE_CODE_OK, next}},
ConsumerUpdateFrame = rabbit_stream_core:frame(ConsumerUpdateCmd),
ok = gen_tcp:send(S, ConsumerUpdateFrame),
C1.
assertConsumerGroup(S, R, PI, Cs, Record) ->
?assertEqual(S, proplists:get_value(stream, Record)),
?assertEqual(R, proplists:get_value(reference, Record)),
?assertEqual(PI, proplists:get_value(partition_index, Record)),
?assertEqual(Cs, proplists:get_value(consumers, Record)),
ok.
2023-10-05 22:01:18 +08:00
list_stream_tracking_validate(_) ->
ValidOpts = #{vhost => <<"/">>, <<"writer">> => true},
?assertMatch({validation_failure, not_enough_args},
?COMMAND_LIST_STREAM_TRACKING:validate([], #{})),
?assertMatch({validation_failure, not_enough_args},
?COMMAND_LIST_STREAM_TRACKING:validate([],
#{vhost =>
<<"test">>})),
?assertMatch({validation_failure, "Specify only one of --all, --offset, --writer."},
?COMMAND_LIST_STREAM_TRACKING:validate([<<"stream">>],
#{all => true, writer => true})),
?assertMatch({validation_failure, too_many_args},
?COMMAND_LIST_STREAM_TRACKING:validate([<<"stream">>, <<"bad">>],
ValidOpts)),
?assertMatch(ok,
?COMMAND_LIST_STREAM_TRACKING:validate([<<"stream">>],
ValidOpts)).
list_stream_tracking_merge_defaults(_Config) ->
?assertMatch({[<<"s">>], #{all := true, vhost := <<"/">>}},
?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{})),
?assertMatch({[<<"s">>], #{writer := true, vhost := <<"/">>}},
?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{writer => true})),
?assertMatch({[<<"s">>], #{all := true, vhost := <<"dev">>}},
?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{vhost => <<"dev">>})),
?assertMatch({[<<"s">>], #{writer := true, vhost := <<"dev">>}},
?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{writer => true, vhost => <<"dev">>})).
list_stream_tracking_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Stream = <<"list_stream_tracking_run">>,
ConsumerReference = <<"foo">>,
PublisherReference = <<"bar">>,
Opts =
#{node => Node,
timeout => 10000,
vhost => <<"/">>},
Args = [Stream],
%% the stream does not exist yet
?assertMatch({error, "The stream does not exist."},
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
{S, C} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
create_stream(S, Stream, C),
?assertMatch([],
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})),
store_offset(S, Stream, ConsumerReference, 42, C),
2023-10-05 22:49:40 +08:00
?assertMatch([[{type,offset}, {name, ConsumerReference}, {tracking_value, 42}]],
2023-10-05 22:01:18 +08:00
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})),
2023-10-05 22:49:40 +08:00
?assertMatch([[{type,offset}, {name, ConsumerReference}, {tracking_value, 42}]],
2023-10-05 22:01:18 +08:00
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{offset => true})),
ok = store_offset(S, Stream, ConsumerReference, 55, C),
2023-10-05 22:49:40 +08:00
?assertMatch([[{type,offset}, {name, ConsumerReference}, {tracking_value, 55}]],
2023-10-05 22:01:18 +08:00
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{offset => true})),
PublisherId = 1,
rabbit_stream_SUITE:test_declare_publisher(gen_tcp, S, PublisherId,
PublisherReference, Stream, C),
rabbit_stream_SUITE:test_publish_confirm(gen_tcp, S, PublisherId, 42, <<"">>, C),
ok = check_publisher_sequence(S, Stream, PublisherReference, 42, C),
?assertMatch([
2023-10-05 22:49:40 +08:00
[{type,writer},{name,<<"bar">>},{tracking_value, 42}],
[{type,offset},{name,<<"foo">>},{tracking_value, 55}]
2023-10-05 22:01:18 +08:00
],
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})),
?assertMatch([
2023-10-05 22:49:40 +08:00
[{type,writer},{name,<<"bar">>},{tracking_value, 42}]
2023-10-05 22:01:18 +08:00
],
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{writer => true})),
rabbit_stream_SUITE:test_publish_confirm(gen_tcp, S, PublisherId, 66, <<"">>, C),
ok = check_publisher_sequence(S, Stream, PublisherReference, 66, C),
?assertMatch([
2023-10-05 22:49:40 +08:00
[{type,writer},{name,<<"bar">>},{tracking_value, 66}]
2023-10-05 22:01:18 +08:00
],
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{writer => true})),
delete_stream(S, Stream, C),
close(S, C),
ok.
2021-09-24 21:18:39 +08:00
add_super_stream_merge_defaults(_Config) ->
?assertMatch({[<<"super-stream">>],
#{partitions := 3, vhost := <<"/">>}},
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
#{})),
?assertMatch({[<<"super-stream">>],
#{partitions := 5, vhost := <<"/">>}},
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
#{partitions => 5})),
DefaultWithBindingKeys =
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
#{binding_keys =>
<<"amer,emea,apac">>}),
?assertMatch({[<<"super-stream">>],
#{binding_keys := <<"amer,emea,apac">>, vhost := <<"/">>}},
DefaultWithBindingKeys),
{_, OptsBks} = DefaultWithBindingKeys,
?assertEqual(false, maps:is_key(partitions, OptsBks)),
2021-09-24 21:18:39 +08:00
DefaultWithRoutingKeys =
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
#{routing_keys =>
<<"amer,emea,apac">>}),
?assertMatch({[<<"super-stream">>],
#{binding_keys := <<"amer,emea,apac">>, vhost := <<"/">>}},
2021-09-24 21:18:39 +08:00
DefaultWithRoutingKeys),
{_, OptsRks} = DefaultWithRoutingKeys,
?assertEqual(false, maps:is_key(partitions, OptsRks)).
2021-09-24 21:18:39 +08:00
add_super_stream_validate(_Config) ->
?assertMatch({validation_failure, not_enough_args},
?COMMAND_ADD_SUPER_STREAM:validate([], #{})),
?assertMatch({validation_failure, too_many_args},
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>, <<"b">>], #{})),
?assertMatch({validation_failure, _},
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{partitions => 1,
routing_keys =>
<<"a,b,c">>})),
?assertMatch({validation_failure, _},
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{partitions => 1,
binding_keys => <<"a,b,c">>})),
?assertMatch({validation_failure, _},
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{routing_keys => 1,
binding_keys => <<"a,b,c">>}
)),
2021-09-24 21:18:39 +08:00
?assertMatch({validation_failure, _},
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{partitions => 0})),
?assertEqual(ok,
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{partitions => 5})),
?assertEqual(ok,
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{routing_keys =>
<<"a,b,c">>})),
?assertEqual(ok,
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{binding_keys =>
<<"a,b,c">>})),
[case Expected of
ok ->
?assertEqual(ok,
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], Opts));
error ->
?assertMatch({validation_failure, _},
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], Opts))
end
|| {Opts, Expected}
<- [{#{max_length_bytes => 1000}, ok},
{#{max_length_bytes => <<"1000">>}, ok},
{#{max_length_bytes => <<"100gb">>}, ok},
{#{max_length_bytes => <<"50mb">>}, ok},
{#{max_length_bytes => <<"50bm">>}, error},
{#{max_age => <<"PT10M">>}, ok},
{#{max_age => <<"P5DT8H">>}, ok},
{#{max_age => <<"foo">>}, error},
{#{stream_max_segment_size_bytes => 1000}, ok},
{#{stream_max_segment_size_bytes => <<"1000">>}, ok},
{#{stream_max_segment_size_bytes => <<"100gb">>}, ok},
{#{stream_max_segment_size_bytes => <<"50mb">>}, ok},
{#{stream_max_segment_size_bytes => <<"50bm">>}, error},
{#{leader_locator => <<"client-local">>}, ok},
{#{leader_locator => <<"least-leaders">>}, ok},
{#{leader_locator => <<"random">>}, ok},
{#{leader_locator => <<"foo">>}, error},
{#{initial_cluster_size => <<"1">>}, ok},
{#{initial_cluster_size => <<"2">>}, ok},
{#{initial_cluster_size => <<"3">>}, ok},
{#{initial_cluster_size => <<"0">>}, error},
{#{initial_cluster_size => <<"-1">>}, error},
{#{initial_cluster_size => <<"foo">>}, error}]],
2021-09-24 21:18:39 +08:00
ok.
2021-09-24 22:27:47 +08:00
delete_super_stream_merge_defaults(_Config) ->
?assertMatch({[<<"super-stream">>], #{vhost := <<"/">>}},
?COMMAND_DELETE_SUPER_STREAM_CLI:merge_defaults([<<"super-stream">>],
2021-09-24 22:27:47 +08:00
#{})),
ok.
delete_super_stream_validate(_Config) ->
?assertMatch({validation_failure, not_enough_args},
?COMMAND_DELETE_SUPER_STREAM_CLI:validate([], #{})),
2021-09-24 22:27:47 +08:00
?assertMatch({validation_failure, too_many_args},
?COMMAND_DELETE_SUPER_STREAM_CLI:validate([<<"a">>, <<"b">>],
2021-09-24 22:27:47 +08:00
#{})),
?assertEqual(ok, ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], #{})),
ok.
add_delete_super_stream_run(Config) ->
2021-09-24 21:18:39 +08:00
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
2021-09-24 22:27:47 +08:00
vhost => <<"/">>},
2021-09-24 21:18:39 +08:00
% with number of partitions
2021-09-24 21:18:39 +08:00
?assertMatch({ok, _},
2021-09-24 22:27:47 +08:00
?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
maps:merge(#{partitions => 3},
Opts))),
2021-09-24 21:18:39 +08:00
?assertEqual({ok,
[<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]},
2021-09-24 22:27:47 +08:00
partitions(Config, <<"invoices">>)),
?assertMatch({ok, _},
?COMMAND_DELETE_SUPER_STREAM_CLI:run([<<"invoices">>], Opts)),
2021-09-24 22:27:47 +08:00
?assertEqual({error, stream_not_found},
partitions(Config, <<"invoices">>)),
% with binding keys
2021-09-24 22:27:47 +08:00
?assertMatch({ok, _},
?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
maps:merge(#{binding_keys => <<" amer,emea , apac">>},
2021-09-24 22:27:47 +08:00
Opts))),
?assertEqual({ok,
[<<"invoices-amer">>, <<"invoices-emea">>,
<<"invoices-apac">>]},
partitions(Config, <<"invoices">>)),
?assertMatch({ok, _},
?COMMAND_DELETE_SUPER_STREAM_CLI:run([<<"invoices">>], Opts)),
2021-09-24 22:27:47 +08:00
?assertEqual({error, stream_not_found},
partitions(Config, <<"invoices">>)),
% with arguments
ExtraOptions =
#{partitions => 3,
max_length_bytes => <<"50mb">>,
max_age => <<"PT10M">>,
stream_max_segment_size_bytes => <<"1mb">>,
leader_locator => <<"random">>,
initial_cluster_size => <<"1">>},
?assertMatch({ok, _},
?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
maps:merge(ExtraOptions, Opts))),
{ok, Q} = queue_lookup(Config, <<"invoices-0">>),
Args = amqqueue:get_arguments(Q),
?assertMatch({_, <<"random">>},
rabbit_misc:table_lookup(Args, <<"x-queue-leader-locator">>)),
?assertMatch({_, 1},
rabbit_misc:table_lookup(Args, <<"x-initial-cluster-size">>)),
?assertMatch({_, 1000000},
rabbit_misc:table_lookup(Args,
<<"x-stream-max-segment-size-bytes">>)),
?assertMatch({_, <<"600s">>},
rabbit_misc:table_lookup(Args, <<"x-max-age">>)),
?assertMatch({_, 50000000},
rabbit_misc:table_lookup(Args, <<"x-max-length-bytes">>)),
?assertMatch({_, <<"stream">>},
rabbit_misc:table_lookup(Args, <<"x-queue-type">>)),
?assertMatch({ok, _},
?COMMAND_DELETE_SUPER_STREAM_CLI:run([<<"invoices">>], Opts)),
2021-09-24 22:27:47 +08:00
ok.
2021-09-25 00:18:55 +08:00
partitions(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
partitions,
[<<"/">>, Name]).
2021-09-24 21:18:39 +08:00
2021-05-13 19:58:54 +08:00
create_stream(S, Stream, C0) ->
rabbit_stream_SUITE:test_create_stream(gen_tcp, S, Stream, C0).
2021-01-19 18:31:39 +08:00
subscribe(S, SubId, Stream, SubProperties, C) ->
rabbit_stream_SUITE:test_subscribe(gen_tcp,
S,
SubId,
Stream,
SubProperties,
2023-06-16 22:39:21 +08:00
?RESPONSE_CODE_OK,
C).
2021-05-13 19:58:54 +08:00
subscribe(S, SubId, Stream, C) ->
rabbit_stream_SUITE:test_subscribe(gen_tcp, S, SubId, Stream, C).
2021-01-19 18:31:39 +08:00
2021-05-13 19:58:54 +08:00
declare_publisher(S, PubId, Stream, C) ->
2021-05-26 17:08:43 +08:00
rabbit_stream_SUITE:test_declare_publisher(gen_tcp,
S,
PubId,
Stream,
C).
2021-01-19 21:49:30 +08:00
2021-05-13 19:58:54 +08:00
delete_stream(S, Stream, C) ->
rabbit_stream_SUITE:test_delete_stream(gen_tcp, S, Stream, C).
2021-01-19 18:31:39 +08:00
2023-10-05 22:01:18 +08:00
delete_stream_no_metadata_update(S, Stream, C) ->
rabbit_stream_SUITE:test_delete_stream(gen_tcp, S, Stream, C, false).
2021-05-13 19:58:54 +08:00
metadata_update_stream_deleted(S, Stream, C) ->
2021-05-26 17:08:43 +08:00
rabbit_stream_SUITE:test_metadata_update_stream_deleted(gen_tcp,
S,
Stream,
C).
2021-01-19 18:31:39 +08:00
2021-05-13 19:58:54 +08:00
close(S, C) ->
rabbit_stream_SUITE:test_close(gen_tcp, S, C).
2021-01-19 18:31:39 +08:00
options(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
verbose => false,
vhost => <<"/">>}, %% just for list_consumers and list_publishers
Opts.
flatten_command_result([], []) ->
[];
flatten_command_result([], Acc) ->
lists:reverse(Acc);
flatten_command_result([[{_K, _V} | _RecordRest] = Record | Rest],
Acc) ->
flatten_command_result(Rest, [Record | Acc]);
flatten_command_result([H | T], Acc) ->
Acc1 = flatten_command_result(H, Acc),
flatten_command_result(T, Acc1).
2021-01-19 18:31:39 +08:00
to_list(CommandRun) ->
Lists = 'Elixir.Enum':to_list(CommandRun),
%% we can get results from different connections, so we flatten out
flatten_command_result(Lists, []).
2021-01-19 18:31:39 +08:00
command_result_count(CommandRun) ->
length(to_list(CommandRun)).
connection_count(Config) ->
command_result_count(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>],
options(Config))).
consumer_count(Config) ->
command_result_count(?COMMAND_LIST_CONSUMERS:run([<<"stream">>],
options(Config))).
2021-01-19 21:49:30 +08:00
publisher_count(Config) ->
command_result_count(?COMMAND_LIST_PUBLISHERS:run([<<"stream">>],
options(Config))).
2021-01-19 18:31:39 +08:00
start_stream_connection(Port) ->
start_stream_connection(gen_tcp, Port).
start_stream_tls_connection(Port) ->
start_stream_connection(ssl, Port).
start_stream_connection(Transport, Port) ->
2023-04-24 21:11:44 +08:00
TlsOpts = case Transport of
ssl -> [{verify, verify_none}];
_ -> []
end,
2021-01-19 18:31:39 +08:00
{ok, S} =
Transport:connect("localhost", Port,
2023-04-24 21:11:44 +08:00
[{active, false}, {mode, binary}] ++ TlsOpts),
2021-05-13 19:58:54 +08:00
C0 = rabbit_stream_core:init(0),
C1 = rabbit_stream_SUITE:test_peer_properties(Transport, S, C0),
C = rabbit_stream_SUITE:test_authenticate(Transport, S, C1),
2021-05-13 19:58:54 +08:00
{S, C}.
2021-01-19 18:31:39 +08:00
start_amqp_connection(Type, Node, Port) ->
Params = amqp_params(Type, Node, Port),
{ok, _Connection} = amqp_connection:start(Params).
amqp_params(network, _, Port) ->
#amqp_params_network{port = Port};
amqp_params(direct, Node, _) ->
#amqp_params_direct{node = Node}.
queue_lookup(Config, Q) ->
QueueName = rabbit_misc:r(<<"/">>, queue, Q),
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_amqqueue,
lookup,
[QueueName]).
2023-10-05 22:01:18 +08:00
store_offset(S, Stream, Reference, Value, C) ->
StoreOffsetFrame =
rabbit_stream_core:frame({store_offset, Reference, Stream, Value}),
ok = gen_tcp:send(S, StoreOffsetFrame),
case check_stored_offset(S, Stream, Reference, Value, C, 20) of
ok ->
ok;
_ ->
{error, offset_not_stored}
end.
check_stored_offset(_, _, _, _, _, 0) ->
error;
check_stored_offset(S, Stream, Reference, Expected, C, Attempt) ->
QueryOffsetFrame =
rabbit_stream_core:frame({request, 1, {query_offset, Reference, Stream}}),
ok = gen_tcp:send(S, QueryOffsetFrame),
{Cmd, _} = rabbit_stream_SUITE:receive_commands(gen_tcp, S, C),
?assertMatch({response, 1, {query_offset, ?RESPONSE_CODE_OK, _}}, Cmd),
{response, 1, {query_offset, ?RESPONSE_CODE_OK, StoredValue}} = Cmd,
case StoredValue of
Expected ->
ok;
_ ->
timer:sleep(50),
check_stored_offset(S, Stream, Reference, Expected, C, Attempt - 1)
end.
check_publisher_sequence(S, Stream, Reference, Expected, C) ->
check_publisher_sequence(S, Stream, Reference, Expected, C, 20).
check_publisher_sequence(_, _, _, _, _, 0) ->
error;
check_publisher_sequence(S, Stream, Reference, Expected, C, Attempt) ->
QueryFrame =
rabbit_stream_core:frame({request, 1, {query_publisher_sequence, Reference, Stream}}),
ok = gen_tcp:send(S, QueryFrame),
{Cmd, _} = rabbit_stream_SUITE:receive_commands(gen_tcp, S, C),
?assertMatch({response, 1, {query_publisher_sequence, _, _}}, Cmd),
{response, 1, {query_publisher_sequence, _, StoredValue}} = Cmd,
case StoredValue of
Expected ->
ok;
_ ->
timer:sleep(50),
check_publisher_sequence(S, Stream, Reference, Expected, C, Attempt - 1)
end.