rabbitmq-server/deps/rabbitmq_stream/test/commands_SUITE.erl

1134 lines
44 KiB
Erlang

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(commands_SUITE).
-compile(nowarn_export_all).
-compile([export_all]).
% -include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
-define(WAIT, 5000).
-define(COMMAND_LIST_CONNECTIONS,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand').
-define(COMMAND_LIST_CONSUMERS,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand').
-define(COMMAND_LIST_PUBLISHERS,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand').
-define(COMMAND_ADD_SUPER_STREAM,
'Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand').
-define(COMMAND_DELETE_SUPER_STREAM_CLI,
'Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand').
-define(COMMAND_LIST_CONSUMER_GROUPS,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumerGroupsCommand').
-define(COMMAND_LIST_GROUP_CONSUMERS,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand').
-define(COMMAND_LIST_STREAM_TRACKING,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamTrackingCommand').
-define(COMMAND_ACTIVATE_STREAM_CONSUMER,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ActivateStreamConsumerCommand').
-define(COMMAND_RESET_OFFSET,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ResetOffsetCommand').
all() ->
[{group, list_connections},
{group, list_consumers},
{group, list_publishers},
{group, list_consumer_groups},
{group, list_group_consumers},
{group, activate_consumer},
{group, list_stream_tracking},
{group, reset_offset},
{group, super_streams}].
groups() ->
[{list_connections, [],
[list_connections_merge_defaults, list_connections_run,
list_tls_connections_run]},
{list_consumers, [],
[list_consumers_merge_defaults, list_consumers_run]},
{list_publishers, [],
[list_publishers_merge_defaults, list_publishers_run]},
{list_consumer_groups, [],
[list_consumer_groups_validate, list_consumer_groups_merge_defaults,
list_consumer_groups_run]},
{list_group_consumers, [],
[list_group_consumers_validate, list_group_consumers_merge_defaults,
list_group_consumers_run]},
{activate_consumer, [],
[activate_consumer_validate, activate_consumer_merge_defaults,
activate_consumer_run]},
{list_stream_tracking, [],
[list_stream_tracking_validate, list_stream_tracking_merge_defaults,
list_stream_tracking_run]},
{reset_offset, [],
[reset_offset_validate, reset_offset_merge_defaults,
reset_offset_run]},
{super_streams, [],
[add_super_stream_merge_defaults,
add_super_stream_validate,
delete_super_stream_merge_defaults,
delete_super_stream_validate,
add_delete_super_stream_run]}].
init_per_suite(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip,
"mixed version clusters are not supported for "
"this suite"};
_ ->
Config1 =
rabbit_ct_helpers:set_config(Config,
[{rmq_nodename_suffix, ?MODULE}]),
Config2 =
rabbit_ct_helpers:set_config(Config1,
{rabbitmq_ct_tls_verify,
verify_none}),
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config2,
rabbit_ct_broker_helpers:setup_steps())
end.
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).
init_per_group(_, Config) ->
Config.
end_per_group(_, Config) ->
Config.
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
list_connections_merge_defaults(_Config) ->
{[<<"node">>, <<"conn_name">>], #{verbose := false}} =
?COMMAND_LIST_CONNECTIONS:merge_defaults([], #{}),
{[<<"other_key">>], #{verbose := true}} =
?COMMAND_LIST_CONNECTIONS:merge_defaults([<<"other_key">>],
#{verbose => true}),
{[<<"other_key">>], #{verbose := false}} =
?COMMAND_LIST_CONNECTIONS:merge_defaults([<<"other_key">>],
#{verbose => false}).
list_connections_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
verbose => false},
%% No connections
[] = to_list(?COMMAND_LIST_CONNECTIONS:run([], Opts)),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
{S1, C1} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
[[{conn_name, _}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
[[{ssl, false}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"ssl">>], Opts)),
{S2, C2} = start_stream_connection(StreamPort),
?awaitMatch(2, connection_count(Config), ?WAIT),
[[{conn_name, _}], [{conn_name, _}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
Port =
rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
start_amqp_connection(network, Node, Port),
%% There are still just two connections
[[{conn_name, _}], [{conn_name, _}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
start_amqp_connection(direct, Node, Port),
%% Still two stream connections, one direct AMQP 0-9-1 connection
[[{conn_name, _}], [{conn_name, _}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
%% Verbose returns all keys
Infos =
lists:map(fun(El) -> atom_to_binary(El, utf8) end, ?INFO_ITEMS),
AllKeys = to_list(?COMMAND_LIST_CONNECTIONS:run(Infos, Opts)),
Verbose =
to_list(?COMMAND_LIST_CONNECTIONS:run([], Opts#{verbose => true})),
?assertEqual(AllKeys, Verbose),
%% There are two connections
[First, _Second] = AllKeys,
%% Keys are INFO_ITEMS
?assertEqual(length(?INFO_ITEMS), length(First)),
{Keys, _} = lists:unzip(First),
?assertEqual([], Keys -- ?INFO_ITEMS),
?assertEqual([], ?INFO_ITEMS -- Keys),
rabbit_stream_SUITE:test_close(gen_tcp, S1, C1),
rabbit_stream_SUITE:test_close(gen_tcp, S2, C2),
ok.
list_tls_connections_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
verbose => false},
%% No connections
[] = to_list(?COMMAND_LIST_CONNECTIONS:run([], Opts)),
StreamTlsPort = rabbit_stream_SUITE:get_stream_port_tls(Config),
application:ensure_all_started(ssl),
{S1, C1} = start_stream_tls_connection(StreamTlsPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
[[{conn_name, _}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>], Opts)),
[[{ssl, true}]] =
to_list(?COMMAND_LIST_CONNECTIONS:run([<<"ssl">>], Opts)),
rabbit_stream_SUITE:test_close(ssl, S1, C1),
ok.
list_consumers_merge_defaults(_Config) ->
DefaultItems =
[rabbit_data_coercion:to_binary(Item)
|| Item <- ?CONSUMER_INFO_ITEMS -- [connection_pid, node]],
{DefaultItems, #{verbose := false}} =
?COMMAND_LIST_CONSUMERS:merge_defaults([], #{}),
{[<<"other_key">>], #{verbose := true}} =
?COMMAND_LIST_CONSUMERS:merge_defaults([<<"other_key">>],
#{verbose => true}),
{[<<"other_key">>], #{verbose := false}} =
?COMMAND_LIST_CONSUMERS:merge_defaults([<<"other_key">>],
#{verbose => false}).
list_consumers_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
verbose => false,
vhost => <<"/">>},
%% No connections, no consumers
[] = to_list(?COMMAND_LIST_CONSUMERS:run([], Opts)),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
{S1, C1} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
Stream = <<"list_consumers_run">>,
C1_1 = create_stream(S1, Stream, C1),
SubId = 42,
C1_2 = subscribe(S1, SubId, Stream, C1_1),
?awaitMatch(1, consumer_count(Config), ?WAIT),
{S2, C2} = start_stream_connection(StreamPort),
?awaitMatch(2, connection_count(Config), ?WAIT),
C2_1 = subscribe(S2, SubId, Stream, C2),
?awaitMatch(2, consumer_count(Config), ?WAIT),
%% Verbose returns all keys
InfoItems = ?CONSUMER_INFO_ITEMS,
Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, InfoItems),
AllKeys = to_list(?COMMAND_LIST_CONSUMERS:run(Infos, Opts)),
Verbose =
to_list(?COMMAND_LIST_CONSUMERS:run([], Opts#{verbose => true})),
?assertEqual(AllKeys, Verbose),
%% There are two consumers
[First, _Second] = AllKeys,
%% Keys are info items
?assertEqual(length(InfoItems), length(First)),
{Keys, _} = lists:unzip(First),
?assertEqual([], Keys -- InfoItems),
?assertEqual([], InfoItems -- Keys),
C1_3 = delete_stream(S1, Stream, C1_2),
% metadata_update_stream_deleted(S1, Stream),
metadata_update_stream_deleted(S2, Stream, C2_1),
close(S1, C1_3),
close(S2, C2_1),
?awaitMatch(0, consumer_count(Config), ?WAIT),
ok.
list_publishers_merge_defaults(_Config) ->
DefaultItems =
[rabbit_data_coercion:to_binary(Item)
|| Item <- ?PUBLISHER_INFO_ITEMS -- [connection_pid, node]],
{DefaultItems, #{verbose := false}} =
?COMMAND_LIST_PUBLISHERS:merge_defaults([], #{}),
{[<<"other_key">>], #{verbose := true}} =
?COMMAND_LIST_PUBLISHERS:merge_defaults([<<"other_key">>],
#{verbose => true}),
{[<<"other_key">>], #{verbose := false}} =
?COMMAND_LIST_PUBLISHERS:merge_defaults([<<"other_key">>],
#{verbose => false}).
list_publishers_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
verbose => false,
vhost => <<"/">>},
%% No connections, no publishers
[] = to_list(?COMMAND_LIST_PUBLISHERS:run([], Opts)),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
{S1, C1} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
Stream = <<"list_publishers_run">>,
C1_1 = create_stream(S1, Stream, C1),
PubId = 42,
C1_2 = declare_publisher(S1, PubId, Stream, C1_1),
?awaitMatch(1, publisher_count(Config), ?WAIT),
{S2, C2} = start_stream_connection(StreamPort),
?awaitMatch(2, connection_count(Config), ?WAIT),
C2_1 = declare_publisher(S2, PubId, Stream, C2),
?awaitMatch(2, publisher_count(Config), ?WAIT),
%% Verbose returns all keys
InfoItems = ?PUBLISHER_INFO_ITEMS,
Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, InfoItems),
AllKeys = to_list(?COMMAND_LIST_PUBLISHERS:run(Infos, Opts)),
Verbose =
to_list(?COMMAND_LIST_PUBLISHERS:run([], Opts#{verbose => true})),
?assertEqual(AllKeys, Verbose),
%% There are two publishers
[First, _Second] = AllKeys,
%% Keys are info items
?assertEqual(length(InfoItems), length(First)),
{Keys, _} = lists:unzip(First),
?assertEqual([], Keys -- InfoItems),
?assertEqual([], InfoItems -- Keys),
C1_3 = delete_stream(S1, Stream, C1_2),
% metadata_update_stream_deleted(S1, Stream),
C2_2 = metadata_update_stream_deleted(S2, Stream, C2_1),
close(S1, C1_3),
close(S2, C2_2),
?awaitMatch(0, publisher_count(Config), ?WAIT),
ok.
list_consumer_groups_validate(_) ->
ValidOpts = #{vhost => <<"/">>},
?assertMatch({validation_failure, {bad_info_key, [foo]}},
?COMMAND_LIST_CONSUMER_GROUPS:validate([<<"foo">>],
ValidOpts)),
?assertMatch(ok,
?COMMAND_LIST_CONSUMER_GROUPS:validate([<<"reference">>],
ValidOpts)),
?assertMatch(ok,
?COMMAND_LIST_CONSUMER_GROUPS:validate([], ValidOpts)).
list_consumer_groups_merge_defaults(_Config) ->
DefaultItems =
[rabbit_data_coercion:to_binary(Item)
|| Item <- ?CONSUMER_GROUP_INFO_ITEMS],
{DefaultItems, #{verbose := false, vhost := <<"/">>}} =
?COMMAND_LIST_CONSUMER_GROUPS:merge_defaults([], #{}),
{[<<"other_key">>], #{verbose := true, vhost := <<"/">>}} =
?COMMAND_LIST_CONSUMER_GROUPS:merge_defaults([<<"other_key">>],
#{verbose => true}),
{[<<"other_key">>], #{verbose := false, vhost := <<"/">>}} =
?COMMAND_LIST_CONSUMER_GROUPS:merge_defaults([<<"other_key">>],
#{verbose => false}).
list_consumer_groups_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
vhost => <<"/">>,
verbose => true},
%% No connections, no consumers
{ok, []} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
{S, C0} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
ConsumerReference = <<"foo">>,
SubProperties =
#{<<"single-active-consumer">> => <<"true">>,
<<"name">> => ConsumerReference},
Stream1 = <<"list_consumer_groups_run_1">>,
C1 = create_stream(S, Stream1, C0),
C2 = subscribe(S, 0, Stream1, SubProperties, C1),
C3 = handle_consumer_update(S, C2, 0),
C4 = subscribe(S, 1, Stream1, SubProperties, C3),
C5 = subscribe(S, 2, Stream1, SubProperties, C4),
?awaitMatch(3, consumer_count(Config), ?WAIT),
{ok, [CG1]} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
assertConsumerGroup(Stream1, ConsumerReference, -1, 3, CG1),
Stream2 = <<"list_consumer_groups_run_2">>,
C6 = create_stream(S, Stream2, C5),
C7 = subscribe(S, 3, Stream2, SubProperties, C6),
C8 = handle_consumer_update(S, C7, 3),
C9 = subscribe(S, 4, Stream2, SubProperties, C8),
C10 = subscribe(S, 5, Stream2, SubProperties, C9),
?awaitMatch(3 + 3, consumer_count(Config), ?WAIT),
{ok, [CG1, CG2]} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
assertConsumerGroup(Stream1, ConsumerReference, -1, 3, CG1),
assertConsumerGroup(Stream2, ConsumerReference, -1, 3, CG2),
C11 = delete_stream(S, Stream1, C10),
C12 = delete_stream(S, Stream2, C11),
close(S, C12),
{ok, []} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
ok.
list_group_consumers_validate(_) ->
ValidOpts =
#{vhost => <<"/">>,
stream => <<"s1">>,
reference => <<"foo">>},
?assertMatch({validation_failure, not_enough_args},
?COMMAND_LIST_GROUP_CONSUMERS:validate([], #{})),
?assertMatch({validation_failure, not_enough_args},
?COMMAND_LIST_GROUP_CONSUMERS:validate([],
#{vhost =>
<<"test">>})),
?assertMatch({validation_failure, {bad_info_key, [foo]}},
?COMMAND_LIST_GROUP_CONSUMERS:validate([<<"foo">>],
ValidOpts)),
?assertMatch(ok,
?COMMAND_LIST_GROUP_CONSUMERS:validate([<<"subscription_id">>],
ValidOpts)),
?assertMatch(ok,
?COMMAND_LIST_GROUP_CONSUMERS:validate([], ValidOpts)).
list_group_consumers_merge_defaults(_Config) ->
DefaultItems =
[rabbit_data_coercion:to_binary(Item)
|| Item <- ?GROUP_CONSUMER_INFO_ITEMS],
{DefaultItems, #{verbose := false, vhost := <<"/">>}} =
?COMMAND_LIST_GROUP_CONSUMERS:merge_defaults([], #{}),
{[<<"other_key">>], #{verbose := true, vhost := <<"/">>}} =
?COMMAND_LIST_GROUP_CONSUMERS:merge_defaults([<<"other_key">>],
#{verbose => true}),
{[<<"other_key">>], #{verbose := false, vhost := <<"/">>}} =
?COMMAND_LIST_GROUP_CONSUMERS:merge_defaults([<<"other_key">>],
#{verbose => false}).
list_group_consumers_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
vhost => <<"/">>,
verbose => false},
Args = [<<"subscription_id">>, <<"state">>],
Stream1 = <<"list_group_consumers_run_1">>,
ConsumerReference = <<"foo">>,
OptsGroup1 =
maps:merge(#{stream => Stream1, reference => ConsumerReference},
Opts),
%% the group does not exist yet
{error, not_found} =
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup1),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
{S, C} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
SubProperties =
#{<<"single-active-consumer">> => <<"true">>,
<<"name">> => ConsumerReference},
create_stream(S, Stream1, C),
subscribe(S, 0, Stream1, SubProperties, C),
handle_consumer_update(S, C, 0),
subscribe(S, 1, Stream1, SubProperties, C),
subscribe(S, 2, Stream1, SubProperties, C),
?awaitMatch(3, consumer_count(Config), ?WAIT),
{ok, Consumers1} =
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup1),
?assertEqual([[{subscription_id, 0}, {state, "active (connected)"}],
[{subscription_id, 1}, {state, "waiting (connected)"}],
[{subscription_id, 2}, {state, "waiting (connected)"}]],
Consumers1),
Stream2 = <<"list_group_consumers_run_2">>,
OptsGroup2 =
maps:merge(#{stream => Stream2, reference => ConsumerReference},
Opts),
create_stream(S, Stream2, C),
subscribe(S, 3, Stream2, SubProperties, C),
handle_consumer_update(S, C, 3),
subscribe(S, 4, Stream2, SubProperties, C),
subscribe(S, 5, Stream2, SubProperties, C),
?awaitMatch(3 + 3, consumer_count(Config), ?WAIT),
{ok, Consumers2} =
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup2),
?assertEqual([[{subscription_id, 3}, {state, "active (connected)"}],
[{subscription_id, 4}, {state, "waiting (connected)"}],
[{subscription_id, 5}, {state, "waiting (connected)"}]],
Consumers2),
delete_stream(S, Stream1, C),
delete_stream(S, Stream2, C),
{error, not_found} =
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup2),
close(S, C),
ok.
activate_consumer_validate(_) ->
Cmd = ?COMMAND_ACTIVATE_STREAM_CONSUMER,
ValidOpts = #{vhost => <<"/">>,
stream => <<"s1">>,
reference => <<"foo">>},
?assertMatch({validation_failure, not_enough_args},
Cmd:validate([], #{})),
?assertMatch({validation_failure, not_enough_args},
Cmd:validate([], #{vhost => <<"test">>})),
?assertMatch({validation_failure, too_many_args},
Cmd:validate([<<"foo">>], ValidOpts)),
?assertMatch(ok, Cmd:validate([], ValidOpts)).
activate_consumer_merge_defaults(_Config) ->
Cmd = ?COMMAND_ACTIVATE_STREAM_CONSUMER,
Opts = #{vhost => <<"/">>,
stream => <<"s1">>,
reference => <<"foo">>},
?assertEqual({[], Opts},
Cmd:merge_defaults([], maps:without([vhost], Opts))),
Merged = maps:merge(Opts, #{vhost => "vhost"}),
?assertEqual({[], Merged},
Cmd:merge_defaults([], Merged)).
activate_consumer_run(Config) ->
Cmd = ?COMMAND_ACTIVATE_STREAM_CONSUMER,
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =#{node => Node,
timeout => 10000,
vhost => <<"/">>},
Args = [],
St = atom_to_binary(?FUNCTION_NAME, utf8),
ConsumerReference = <<"foo">>,
OptsGroup = maps:merge(#{stream => St, reference => ConsumerReference},
Opts),
%% the group does not exist yet
?assertEqual({error, not_found}, Cmd:run(Args, OptsGroup)),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
{S, C} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
SubProperties =#{<<"single-active-consumer">> => <<"true">>,
<<"name">> => ConsumerReference},
create_stream(S, St, C),
subscribe(S, 0, St, SubProperties, C),
handle_consumer_update(S, C, 0),
subscribe(S, 1, St, SubProperties, C),
subscribe(S, 2, St, SubProperties, C),
?awaitMatch(3, consumer_count(Config), ?WAIT),
?assertEqual(ok, Cmd:run(Args, OptsGroup)),
delete_stream(S, St, C),
close(S, C),
ok.
handle_consumer_update(S, C0, SubId) ->
{{request, CorrId, {consumer_update, SubId, true}}, C1} =
rabbit_stream_SUITE:receive_commands(gen_tcp, S, C0),
ConsumerUpdateCmd =
{response, CorrId, {consumer_update, ?RESPONSE_CODE_OK, next}},
ConsumerUpdateFrame = rabbit_stream_core:frame(ConsumerUpdateCmd),
ok = gen_tcp:send(S, ConsumerUpdateFrame),
C1.
assertConsumerGroup(S, R, PI, Cs, Record) ->
?assertEqual(S, proplists:get_value(stream, Record)),
?assertEqual(R, proplists:get_value(reference, Record)),
?assertEqual(PI, proplists:get_value(partition_index, Record)),
?assertEqual(Cs, proplists:get_value(consumers, Record)),
ok.
list_stream_tracking_validate(_) ->
ValidOpts = #{vhost => <<"/">>, <<"writer">> => true},
?assertMatch({validation_failure, not_enough_args},
?COMMAND_LIST_STREAM_TRACKING:validate([], #{})),
?assertMatch({validation_failure, not_enough_args},
?COMMAND_LIST_STREAM_TRACKING:validate([],
#{vhost =>
<<"test">>})),
?assertMatch({validation_failure, "Specify only one of --all, --offset, --writer."},
?COMMAND_LIST_STREAM_TRACKING:validate([<<"stream">>],
#{all => true, writer => true})),
?assertMatch({validation_failure, too_many_args},
?COMMAND_LIST_STREAM_TRACKING:validate([<<"stream">>, <<"bad">>],
ValidOpts)),
?assertMatch(ok,
?COMMAND_LIST_STREAM_TRACKING:validate([<<"stream">>],
ValidOpts)).
list_stream_tracking_merge_defaults(_Config) ->
?assertMatch({[<<"s">>], #{all := true, vhost := <<"/">>}},
?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{})),
?assertMatch({[<<"s">>], #{writer := true, vhost := <<"/">>}},
?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{writer => true})),
?assertMatch({[<<"s">>], #{all := true, vhost := <<"dev">>}},
?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{vhost => <<"dev">>})),
?assertMatch({[<<"s">>], #{writer := true, vhost := <<"dev">>}},
?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{writer => true, vhost => <<"dev">>})).
list_stream_tracking_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Stream = <<"list_stream_tracking_run">>,
ConsumerReference = <<"foo">>,
PublisherReference = <<"bar">>,
Opts =
#{node => Node,
timeout => 10000,
vhost => <<"/">>},
Args = [Stream],
%% the stream does not exist yet
?assertMatch({error, "The stream does not exist."},
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
{S, C} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
create_stream(S, Stream, C),
?assertMatch([],
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})),
store_offset(S, Stream, ConsumerReference, 42, C),
?assertMatch([[{type,offset}, {name, ConsumerReference}, {tracking_value, 42}]],
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})),
?assertMatch([[{type,offset}, {name, ConsumerReference}, {tracking_value, 42}]],
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{offset => true})),
ok = store_offset(S, Stream, ConsumerReference, 55, C),
?assertMatch([[{type,offset}, {name, ConsumerReference}, {tracking_value, 55}]],
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{offset => true})),
PublisherId = 1,
rabbit_stream_SUITE:test_declare_publisher(gen_tcp, S, PublisherId,
PublisherReference, Stream, C),
rabbit_stream_SUITE:test_publish_confirm(gen_tcp, S, PublisherId, 42, <<"">>, C),
ok = check_publisher_sequence(S, Stream, PublisherReference, 42, C),
?assertMatch([
[{type,writer},{name,<<"bar">>},{tracking_value, 42}],
[{type,offset},{name,<<"foo">>},{tracking_value, 55}]
],
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})),
?assertMatch([
[{type,writer},{name,<<"bar">>},{tracking_value, 42}]
],
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{writer => true})),
rabbit_stream_SUITE:test_publish_confirm(gen_tcp, S, PublisherId, 66, <<"">>, C),
ok = check_publisher_sequence(S, Stream, PublisherReference, 66, C),
?assertMatch([
[{type,writer},{name,<<"bar">>},{tracking_value, 66}]
],
?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{writer => true})),
delete_stream(S, Stream, C),
close(S, C),
ok.
reset_offset_validate(_) ->
Cmd = ?COMMAND_RESET_OFFSET,
ValidOpts = #{vhost => <<"/">>,
stream => <<"s1">>,
reference => <<"foo">>},
?assertMatch({validation_failure, not_enough_args},
Cmd:validate([], #{})),
?assertMatch({validation_failure, not_enough_args},
Cmd:validate([], #{vhost => <<"test">>})),
?assertMatch({validation_failure, too_many_args},
Cmd:validate([<<"foo">>], ValidOpts)),
?assertMatch({validation_failure, reference_too_long},
Cmd:validate([], ValidOpts#{reference => gen_bin(256)})),
?assertMatch(ok, Cmd:validate([], ValidOpts)),
?assertMatch(ok, Cmd:validate([], ValidOpts#{reference => gen_bin(255)})).
reset_offset_merge_defaults(_Config) ->
Cmd = ?COMMAND_RESET_OFFSET,
Opts = #{vhost => <<"/">>,
stream => <<"s1">>,
reference => <<"foo">>},
?assertEqual({[], Opts},
Cmd:merge_defaults([], maps:without([vhost], Opts))),
Merged = maps:merge(Opts, #{vhost => "vhost"}),
?assertEqual({[], Merged},
Cmd:merge_defaults([], Merged)).
reset_offset_run(Config) ->
Cmd = ?COMMAND_RESET_OFFSET,
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =#{node => Node,
timeout => 10000,
vhost => <<"/">>},
Args = [],
St = atom_to_binary(?FUNCTION_NAME, utf8),
Ref = <<"foo">>,
OptsGroup = maps:merge(#{stream => St, reference => Ref},
Opts),
%% the stream does not exist yet
?assertMatch({error, not_found},
Cmd:run(Args, OptsGroup)),
Port = rabbit_stream_SUITE:get_stream_port(Config),
{S, C} = start_stream_connection(Port),
create_stream(S, St, C),
?assertEqual({error, no_reference}, Cmd:run(Args, OptsGroup)),
store_offset(S, St, Ref, 42, C),
check_stored_offset(S, St, Ref, 42, C),
?assertMatch(ok, Cmd:run(Args, OptsGroup)),
check_stored_offset(S, St, Ref, 0, C),
delete_stream(S, St, C),
close(S, C),
ok.
add_super_stream_merge_defaults(_Config) ->
?assertMatch({[<<"super-stream">>],
#{partitions := 3, vhost := <<"/">>}},
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
#{})),
?assertMatch({[<<"super-stream">>],
#{partitions := 5, vhost := <<"/">>}},
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
#{partitions => 5})),
DefaultWithBindingKeys =
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
#{binding_keys =>
<<"amer,emea,apac">>}),
?assertMatch({[<<"super-stream">>],
#{binding_keys := <<"amer,emea,apac">>, vhost := <<"/">>}},
DefaultWithBindingKeys),
{_, OptsBks} = DefaultWithBindingKeys,
?assertEqual(false, maps:is_key(partitions, OptsBks)),
DefaultWithRoutingKeys =
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
#{routing_keys =>
<<"amer,emea,apac">>}),
?assertMatch({[<<"super-stream">>],
#{binding_keys := <<"amer,emea,apac">>, vhost := <<"/">>}},
DefaultWithRoutingKeys),
{_, OptsRks} = DefaultWithRoutingKeys,
?assertEqual(false, maps:is_key(partitions, OptsRks)).
add_super_stream_validate(_Config) ->
?assertMatch({validation_failure, not_enough_args},
?COMMAND_ADD_SUPER_STREAM:validate([], #{})),
?assertMatch({validation_failure, too_many_args},
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>, <<"b">>], #{})),
?assertMatch({validation_failure, _},
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{partitions => 1,
routing_keys =>
<<"a,b,c">>})),
?assertMatch({validation_failure, _},
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{partitions => 1,
binding_keys => <<"a,b,c">>})),
?assertMatch({validation_failure, _},
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{routing_keys => 1,
binding_keys => <<"a,b,c">>}
)),
?assertMatch({validation_failure, _},
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{partitions => 0})),
?assertEqual(ok,
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{partitions => 5})),
?assertEqual(ok,
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{routing_keys =>
<<"a,b,c">>})),
?assertEqual(ok,
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{binding_keys =>
<<"a,b,c">>})),
[case Expected of
ok ->
?assertEqual(ok,
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], Opts));
error ->
?assertMatch({validation_failure, _},
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], Opts))
end
|| {Opts, Expected}
<- [{#{max_length_bytes => 1000}, ok},
{#{max_length_bytes => <<"1000">>}, ok},
{#{max_length_bytes => <<"100gb">>}, ok},
{#{max_length_bytes => <<"50mb">>}, ok},
{#{max_length_bytes => <<"50bm">>}, error},
{#{max_age => <<"PT10M">>}, ok},
{#{max_age => <<"P5DT8H">>}, ok},
{#{max_age => <<"foo">>}, error},
{#{stream_max_segment_size_bytes => 1000}, ok},
{#{stream_max_segment_size_bytes => <<"1000">>}, ok},
{#{stream_max_segment_size_bytes => <<"100gb">>}, ok},
{#{stream_max_segment_size_bytes => <<"50mb">>}, ok},
{#{stream_max_segment_size_bytes => <<"50bm">>}, error},
{#{leader_locator => <<"client-local">>}, ok},
{#{leader_locator => <<"least-leaders">>}, ok},
{#{leader_locator => <<"random">>}, ok},
{#{leader_locator => <<"foo">>}, error},
{#{initial_cluster_size => <<"1">>}, ok},
{#{initial_cluster_size => <<"2">>}, ok},
{#{initial_cluster_size => <<"3">>}, ok},
{#{initial_cluster_size => <<"0">>}, error},
{#{initial_cluster_size => <<"-1">>}, error},
{#{initial_cluster_size => <<"foo">>}, error}]],
ok.
delete_super_stream_merge_defaults(_Config) ->
?assertMatch({[<<"super-stream">>], #{vhost := <<"/">>}},
?COMMAND_DELETE_SUPER_STREAM_CLI:merge_defaults([<<"super-stream">>],
#{})),
ok.
delete_super_stream_validate(_Config) ->
?assertMatch({validation_failure, not_enough_args},
?COMMAND_DELETE_SUPER_STREAM_CLI:validate([], #{})),
?assertMatch({validation_failure, too_many_args},
?COMMAND_DELETE_SUPER_STREAM_CLI:validate([<<"a">>, <<"b">>],
#{})),
?assertEqual(ok, ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], #{})),
ok.
add_delete_super_stream_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
vhost => <<"/">>},
% with number of partitions
?assertMatch({ok, _},
?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
maps:merge(#{partitions => 3},
Opts))),
?assertEqual({ok,
[<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]},
partitions(Config, <<"invoices">>)),
?assertMatch({ok, _},
?COMMAND_DELETE_SUPER_STREAM_CLI:run([<<"invoices">>], Opts)),
?assertEqual({error, stream_not_found},
partitions(Config, <<"invoices">>)),
% with binding keys
?assertMatch({ok, _},
?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
maps:merge(#{binding_keys => <<" amer,emea , apac">>},
Opts))),
?assertEqual({ok,
[<<"invoices-amer">>, <<"invoices-emea">>,
<<"invoices-apac">>]},
partitions(Config, <<"invoices">>)),
?assertMatch({ok, _},
?COMMAND_DELETE_SUPER_STREAM_CLI:run([<<"invoices">>], Opts)),
?assertEqual({error, stream_not_found},
partitions(Config, <<"invoices">>)),
% with arguments
ExtraOptions =
#{partitions => 3,
max_length_bytes => <<"50mb">>,
max_age => <<"PT10M">>,
stream_max_segment_size_bytes => <<"1mb">>,
leader_locator => <<"random">>,
initial_cluster_size => <<"1">>},
?assertMatch({ok, _},
?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
maps:merge(ExtraOptions, Opts))),
{ok, Q} = queue_lookup(Config, <<"invoices-0">>),
Args = amqqueue:get_arguments(Q),
?assertMatch({_, <<"random">>},
rabbit_misc:table_lookup(Args, <<"x-queue-leader-locator">>)),
?assertMatch({_, 1},
rabbit_misc:table_lookup(Args, <<"x-initial-cluster-size">>)),
?assertMatch({_, 1000000},
rabbit_misc:table_lookup(Args,
<<"x-stream-max-segment-size-bytes">>)),
?assertMatch({_, <<"600s">>},
rabbit_misc:table_lookup(Args, <<"x-max-age">>)),
?assertMatch({_, 50000000},
rabbit_misc:table_lookup(Args, <<"x-max-length-bytes">>)),
?assertMatch({_, <<"stream">>},
rabbit_misc:table_lookup(Args, <<"x-queue-type">>)),
?assertMatch({ok, _},
?COMMAND_DELETE_SUPER_STREAM_CLI:run([<<"invoices">>], Opts)),
ok.
partitions(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
partitions,
[<<"/">>, Name]).
create_stream(S, Stream, C0) ->
rabbit_stream_SUITE:test_create_stream(gen_tcp, S, Stream, C0).
subscribe(S, SubId, Stream, SubProperties, C) ->
rabbit_stream_SUITE:test_subscribe(gen_tcp,
S,
SubId,
Stream,
SubProperties,
?RESPONSE_CODE_OK,
C).
subscribe(S, SubId, Stream, C) ->
rabbit_stream_SUITE:test_subscribe(gen_tcp, S, SubId, Stream, C).
declare_publisher(S, PubId, Stream, C) ->
rabbit_stream_SUITE:test_declare_publisher(gen_tcp,
S,
PubId,
Stream,
C).
delete_stream(S, Stream, C) ->
rabbit_stream_SUITE:test_delete_stream(gen_tcp, S, Stream, C).
delete_stream_no_metadata_update(S, Stream, C) ->
rabbit_stream_SUITE:test_delete_stream(gen_tcp, S, Stream, C, false).
metadata_update_stream_deleted(S, Stream, C) ->
rabbit_stream_SUITE:test_metadata_update_stream_deleted(gen_tcp,
S,
Stream,
C).
close(S, C) ->
rabbit_stream_SUITE:test_close(gen_tcp, S, C).
options(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
verbose => false,
vhost => <<"/">>}, %% just for list_consumers and list_publishers
Opts.
flatten_command_result([], []) ->
[];
flatten_command_result([], Acc) ->
lists:reverse(Acc);
flatten_command_result([[{_K, _V} | _RecordRest] = Record | Rest],
Acc) ->
flatten_command_result(Rest, [Record | Acc]);
flatten_command_result([H | T], Acc) ->
Acc1 = flatten_command_result(H, Acc),
flatten_command_result(T, Acc1).
to_list(CommandRun) ->
Lists = 'Elixir.Enum':to_list(CommandRun),
%% we can get results from different connections, so we flatten out
flatten_command_result(Lists, []).
command_result_count(CommandRun) ->
length(to_list(CommandRun)).
connection_count(Config) ->
command_result_count(?COMMAND_LIST_CONNECTIONS:run([<<"conn_name">>],
options(Config))).
consumer_count(Config) ->
command_result_count(?COMMAND_LIST_CONSUMERS:run([<<"stream">>],
options(Config))).
publisher_count(Config) ->
command_result_count(?COMMAND_LIST_PUBLISHERS:run([<<"stream">>],
options(Config))).
start_stream_connection(Port) ->
start_stream_connection(gen_tcp, Port).
start_stream_tls_connection(Port) ->
start_stream_connection(ssl, Port).
start_stream_connection(Transport, Port) ->
TlsOpts = case Transport of
ssl -> [{verify, verify_none}];
_ -> []
end,
{ok, S} =
Transport:connect("localhost", Port,
[{active, false}, {mode, binary}] ++ TlsOpts),
C0 = rabbit_stream_core:init(0),
C1 = rabbit_stream_SUITE:test_peer_properties(Transport, S, C0),
C = rabbit_stream_SUITE:test_authenticate(Transport, S, C1),
{S, C}.
start_amqp_connection(Type, Node, Port) ->
Params = amqp_params(Type, Node, Port),
{ok, _Connection} = amqp_connection:start(Params).
amqp_params(network, _, Port) ->
#amqp_params_network{port = Port};
amqp_params(direct, Node, _) ->
#amqp_params_direct{node = Node}.
queue_lookup(Config, Q) ->
QueueName = rabbit_misc:r(<<"/">>, queue, Q),
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_amqqueue,
lookup,
[QueueName]).
store_offset(S, Stream, Reference, Value, C) ->
StoreOffsetFrame =
rabbit_stream_core:frame({store_offset, Reference, Stream, Value}),
ok = gen_tcp:send(S, StoreOffsetFrame),
case check_stored_offset(S, Stream, Reference, Value, C, 20) of
ok ->
ok;
_ ->
{error, offset_not_stored}
end.
check_stored_offset(S, Stream, Reference, Expected, C) ->
check_stored_offset(S, Stream, Reference, Expected, C, 20).
check_stored_offset(_, _, _, _, _, 0) ->
error;
check_stored_offset(S, Stream, Reference, Expected, C, Attempt) ->
QueryOffsetFrame =
rabbit_stream_core:frame({request, 1, {query_offset, Reference, Stream}}),
ok = gen_tcp:send(S, QueryOffsetFrame),
{Cmd, _} = rabbit_stream_SUITE:receive_commands(gen_tcp, S, C),
?assertMatch({response, 1, {query_offset, ?RESPONSE_CODE_OK, _}}, Cmd),
{response, 1, {query_offset, ?RESPONSE_CODE_OK, StoredValue}} = Cmd,
case StoredValue of
Expected ->
ok;
_ ->
timer:sleep(50),
check_stored_offset(S, Stream, Reference, Expected, C, Attempt - 1)
end.
check_publisher_sequence(S, Stream, Reference, Expected, C) ->
check_publisher_sequence(S, Stream, Reference, Expected, C, 20).
check_publisher_sequence(_, _, _, _, _, 0) ->
error;
check_publisher_sequence(S, Stream, Reference, Expected, C, Attempt) ->
QueryFrame =
rabbit_stream_core:frame({request, 1, {query_publisher_sequence, Reference, Stream}}),
ok = gen_tcp:send(S, QueryFrame),
{Cmd, _} = rabbit_stream_SUITE:receive_commands(gen_tcp, S, C),
?assertMatch({response, 1, {query_publisher_sequence, _, _}}, Cmd),
{response, 1, {query_publisher_sequence, _, StoredValue}} = Cmd,
case StoredValue of
Expected ->
ok;
_ ->
timer:sleep(50),
check_publisher_sequence(S, Stream, Reference, Expected, C, Attempt - 1)
end.
gen_bin(L) ->
list_to_binary(lists:duplicate(L, "a")).