rabbitmq-server/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

1783 lines
72 KiB
Erlang

%% The contents of this file are subject to the Mozilla Public License
%% Version 2.0 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at https://www.mozilla.org/en-US/MPL/2.0/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
%% The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_stream_SUITE).
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
-include("rabbit_stream_metrics.hrl").
-include_lib("rabbitmq_stream/src/rabbit_stream_utils.hrl").
-compile(nowarn_export_all).
-compile(export_all).
-import(rabbit_stream_core, [frame/1]).
-import(rabbit_ct_broker_helpers, [rpc/5]).
-import(rabbit_ct_helpers, [await_condition/1]).
-define(WAIT, 5000).
all() ->
[{group, single_node}, {group, single_node_1}, {group, cluster}].
groups() ->
[{single_node, [],
[test_stream,
test_stream_tls,
test_publish_v2,
test_super_stream_creation_deletion,
test_gc_consumers,
test_gc_publishers,
test_update_secret,
cannot_update_username_after_authenticated,
cannot_use_another_authmechanism_when_updating_secret,
update_secret_should_close_connection_if_wrong_secret,
update_secret_should_close_connection_if_unauthorized_vhost,
unauthenticated_client_rejected_tcp_connected,
timeout_tcp_connected,
unauthenticated_client_rejected_peer_properties_exchanged,
timeout_peer_properties_exchanged,
unauthenticated_client_rejected_authenticating,
timeout_authenticating,
timeout_close_sent,
max_segment_size_bytes_validation,
close_connection_on_consumer_update_timeout,
set_filter_size,
vhost_queue_limit,
connection_should_be_closed_on_token_expiry,
should_receive_metadata_update_after_update_secret,
store_offset_requires_read_access,
offset_lag_calculation,
test_super_stream_duplicate_partitions,
authentication_error_should_close_with_delay,
unauthorized_vhost_access_should_close_with_delay,
sasl_anonymous,
test_publisher_with_too_long_reference_errors,
test_consumer_with_too_long_reference_errors,
subscribe_unsubscribe_should_create_events,
test_stream_test_utils,
sac_subscription_with_partition_index_conflict_should_return_error,
test_metadata_with_advertised_hints,
test_connection_properties_with_advertised_hints
]},
%% Run `test_global_counters` on its own so the global metrics are
%% initialised to 0 for each testcase
{single_node_1, [], [test_global_counters]},
{cluster, [], [test_stream, test_stream_tls, test_metadata, java]}].
init_per_suite(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip, "mixed version clusters are not supported"};
_ ->
rabbit_ct_helpers:log_environment(),
Config
end.
end_per_suite(Config) ->
Config.
init_per_group(Group, Config)
when Group == single_node orelse Group == single_node_1 ->
Config1 = rabbit_ct_helpers:set_config(
Config, [{rmq_nodes_clustered, false},
{rabbitmq_ct_tls_verify, verify_none},
{rabbitmq_stream, verify_none}
]),
%% filtering feature flag disabled for the first test,
%% then enabled in the end_per_testcase function
ExtraSetupSteps =
case Group of
single_node ->
[fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
{rabbit,
[{forced_feature_flags_on_init,
[stream_queue,
stream_sac_coordinator_unblock_group,
stream_single_active_consumer]}]})
end];
_ ->
[]
end,
rabbit_ct_helpers:run_setup_steps(
Config1,
[fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
{rabbit,
[{core_metrics_gc_interval,
1000}]})
end,
fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
{rabbitmq_stream,
[{connection_negotiation_step_timeout,
500}]})
end]
++ ExtraSetupSteps
++ rabbit_ct_broker_helpers:setup_steps());
init_per_group(cluster = Group, Config) ->
Config1 = rabbit_ct_helpers:set_config(
Config, [{rmq_nodes_clustered, true},
{rmq_nodes_count, 3},
{rmq_nodename_suffix, Group},
{tcp_ports_base},
{rabbitmq_ct_tls_verify, verify_none},
{find_crashes, false} %% we kill stream members in some tests
]),
rabbit_ct_helpers:run_setup_steps(
Config1,
[fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
{aten,
[{poll_interval,
1000}]})
end]
++ rabbit_ct_broker_helpers:setup_steps());
init_per_group(_, Config) ->
rabbit_ct_helpers:run_setup_steps(Config).
end_per_group(java, Config) ->
rabbit_ct_helpers:run_teardown_steps(Config);
end_per_group(_, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(test_update_secret = TestCase, Config) ->
rabbit_ct_helpers:testcase_started(Config, TestCase);
init_per_testcase(cannot_update_username_after_authenticated = TestCase, Config) ->
ok = rabbit_ct_broker_helpers:add_user(Config, <<"other">>),
rabbit_ct_helpers:testcase_started(Config, TestCase);
init_per_testcase(update_secret_should_close_connection_if_unauthorized_vhost = TestCase,
Config) ->
ok = rabbit_ct_broker_helpers:add_user(Config, <<"other">>),
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, <<"other">>, <<"/">>),
rabbit_ct_helpers:testcase_started(Config, TestCase);
init_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config,
0,
application,
set_env,
[rabbitmq_stream, request_timeout, 2000]),
rabbit_ct_helpers:testcase_started(Config, TestCase);
init_per_testcase(vhost_queue_limit = TestCase, Config) ->
QueueCount = rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_amqqueue,
count,
[<<"/">>]),
ok = rabbit_ct_broker_helpers:set_vhost_limit(Config, 0, <<"/">>, max_queues, QueueCount + 5),
rabbit_ct_helpers:testcase_started(Config, TestCase);
init_per_testcase(store_offset_requires_read_access = TestCase, Config) ->
ok = rabbit_ct_broker_helpers:add_user(Config, <<"test">>),
rabbit_ct_helpers:testcase_started(Config, TestCase);
init_per_testcase(unauthorized_vhost_access_should_close_with_delay = TestCase, Config) ->
ok = rabbit_ct_broker_helpers:add_user(Config, <<"other">>),
rabbit_ct_helpers:testcase_started(Config, TestCase);
init_per_testcase(TestCase, Config) ->
rabbit_ct_helpers:testcase_started(Config, TestCase).
end_per_testcase(test_update_secret = TestCase, Config) ->
ok = rabbit_ct_broker_helpers:change_password(Config, <<"guest">>, <<"guest">>),
rabbit_ct_helpers:testcase_finished(Config, TestCase);
end_per_testcase(cannot_update_username_after_authenticated = TestCase, Config) ->
ok = rabbit_ct_broker_helpers:delete_user(Config, <<"other">>),
rabbit_ct_helpers:testcase_finished(Config, TestCase);
end_per_testcase(update_secret_should_close_connection_if_unauthorized_vhost = TestCase,
Config) ->
ok = rabbit_ct_broker_helpers:delete_user(Config, <<"other">>),
rabbit_ct_helpers:testcase_finished(Config, TestCase);
end_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config,
0,
application,
set_env,
[rabbitmq_stream, request_timeout, 60000]),
rabbit_ct_helpers:testcase_finished(Config, TestCase);
end_per_testcase(vhost_queue_limit = TestCase, Config) ->
_ = rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_vhost_limit,
clear,
[<<"/">>, <<"guest">>]),
rabbit_ct_helpers:testcase_finished(Config, TestCase);
end_per_testcase(store_offset_requires_read_access = TestCase, Config) ->
ok = rabbit_ct_broker_helpers:delete_user(Config, <<"test">>),
rabbit_ct_helpers:testcase_finished(Config, TestCase);
end_per_testcase(unauthorized_vhost_access_should_close_with_delay = TestCase, Config) ->
ok = rabbit_ct_broker_helpers:delete_user(Config, <<"other">>),
rabbit_ct_helpers:testcase_finished(Config, TestCase);
end_per_testcase(TestCase, Config)
when TestCase =:= test_metadata_with_advertised_hints orelse
TestCase =:= test_connection_properties_with_advertised_hints ->
lists:foreach(fun(K) ->
ok = rpc(Config, 0,
application,
set_env,
[rabbitmq_stream, K, undefined])
end, [?K_AD_HOST, ?K_AD_PORT,
?K_AD_TLS_HOST, ?K_AD_TLS_PORT]),
rabbit_ct_helpers:testcase_finished(Config, TestCase);
end_per_testcase(TestCase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, TestCase).
test_global_counters(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
test_server(gen_tcp, Stream, Config),
?assertEqual(#{publishers => 0,
consumers => 0,
messages_confirmed_total => 2,
messages_received_confirm_total => 2,
messages_received_total => 2,
messages_routed_total => 0,
messages_unroutable_dropped_total => 0,
messages_unroutable_returned_total => 0,
stream_error_access_refused_total => 0,
stream_error_authentication_failure_total => 0,
stream_error_frame_too_large_total => 0,
stream_error_internal_error_total => 0,
stream_error_precondition_failed_total => 0,
stream_error_publisher_does_not_exist_total => 0,
stream_error_sasl_authentication_failure_loopback_total => 0,
stream_error_sasl_challenge_total => 0,
stream_error_sasl_error_total => 0,
stream_error_sasl_mechanism_not_supported_total => 0,
stream_error_stream_already_exists_total => 0,
stream_error_stream_does_not_exist_total => 0,
stream_error_stream_not_available_total => 1,
stream_error_subscription_id_already_exists_total => 0,
stream_error_subscription_id_does_not_exist_total => 0,
stream_error_unknown_frame_total => 0,
stream_error_vhost_access_failure_total => 0},
get_global_counters(Config)),
ok.
test_stream(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
test_server(gen_tcp, Stream, Config),
ok.
sasl_anonymous(Config) ->
Port = get_port(gen_tcp, Config),
Opts = get_opts(gen_tcp),
{ok, S} = gen_tcp:connect("localhost", Port, Opts),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(gen_tcp, S, C0),
C2 = sasl_handshake(gen_tcp, S, C1),
C3 = test_anonymous_sasl_authenticate(gen_tcp, S, C2),
_C = tune(gen_tcp, S, C3).
test_update_secret(Config) ->
Transport = gen_tcp,
{S, C0} = connect_and_authenticate(Transport, Config),
rabbit_ct_broker_helpers:change_password(Config, <<"guest">>, <<"password">>),
C1 = expect_successful_authentication(
try_authenticate(Transport, S, C0, <<"PLAIN">>, <<"guest">>, <<"password">>)),
_C2 = test_close(Transport, S, C1),
closed = wait_for_socket_close(Transport, S, 10),
ok.
cannot_update_username_after_authenticated(Config) ->
{S, C0} = connect_and_authenticate(gen_tcp, Config),
_C1 = expect_unsuccessful_authentication(
try_authenticate(gen_tcp, S, C0, <<"PLAIN">>, <<"other">>, <<"other">>),
?RESPONSE_SASL_CANNOT_CHANGE_USERNAME),
closed = wait_for_socket_close(gen_tcp, S, 10),
ok.
cannot_use_another_authmechanism_when_updating_secret(Config) ->
{S, C0} = connect_and_authenticate(gen_tcp, Config),
_C1 = expect_unsuccessful_authentication(
try_authenticate(gen_tcp, S, C0, <<"EXTERNAL">>, <<"guest">>, <<"new_password">>),
?RESPONSE_SASL_CANNOT_CHANGE_MECHANISM),
closed = wait_for_socket_close(gen_tcp, S, 10),
ok.
update_secret_should_close_connection_if_wrong_secret(Config) ->
Transport = gen_tcp,
{S, C0} = connect_and_authenticate(Transport, Config),
Pwd = rand_bin(),
_C1 = expect_unsuccessful_authentication(
try_authenticate(Transport, S, C0, <<"PLAIN">>, <<"guest">>, Pwd),
?RESPONSE_AUTHENTICATION_FAILURE),
closed = wait_for_socket_close(Transport, S, 10),
ok.
update_secret_should_close_connection_if_unauthorized_vhost(Config) ->
T = gen_tcp,
Port = get_port(T, Config),
Opts = get_opts(T),
{ok, S} = T:connect("localhost", Port, Opts),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(T, S, C0),
Username = <<"other">>,
C2 = test_authenticate(T, S, C1, Username),
ok = rabbit_ct_broker_helpers:clear_permissions(Config, Username, <<"/">>),
_C3 = expect_unsuccessful_authentication(
try_authenticate(gen_tcp, S, C2, <<"PLAIN">>, Username, Username),
?RESPONSE_VHOST_ACCESS_FAILURE),
closed = wait_for_socket_close(T, S, 10),
ok.
test_stream_tls(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
test_server(ssl, Stream, Config),
ok.
test_publish_v2(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
Transport = gen_tcp,
Port = get_stream_port(Config),
Opts = [{active, false}, {mode, binary}],
{ok, S} =
Transport:connect("localhost", Port, Opts),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
C2 = test_authenticate(Transport, S, C1),
C3 = test_create_stream(Transport, S, Stream, C2),
PublisherId = 42,
C4 = test_declare_publisher(Transport, S, PublisherId, Stream, C3),
Body = <<"hello">>,
C5 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body,
publish_confirm, C4),
C6 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body,
publish_confirm, C5),
SubscriptionId = 42,
C7 = test_subscribe(Transport, S, SubscriptionId, Stream,
#{<<"filter.0">> => <<"foo">>},
?RESPONSE_CODE_OK,
C6),
C8 = test_deliver(Transport, S, SubscriptionId, 0, Body, C7),
C8b = test_deliver(Transport, S, SubscriptionId, 1, Body, C8),
C9 = test_unsubscribe(Transport, S, SubscriptionId, C8b),
C10 = test_delete_stream(Transport, S, Stream, C9),
_C11 = test_close(Transport, S, C10),
closed = wait_for_socket_close(Transport, S, 10),
ok.
test_super_stream_creation_deletion(Config) ->
T = gen_tcp,
Port = get_port(T, Config),
Opts = get_opts(T),
{ok, S} = T:connect("localhost", Port, Opts),
C = rabbit_stream_core:init(0),
test_peer_properties(T, S, C),
test_authenticate(T, S, C),
Ss = atom_to_binary(?FUNCTION_NAME, utf8),
Partitions = [unicode:characters_to_binary([Ss, <<"-">>, integer_to_binary(N)]) || N <- lists:seq(0, 2)],
Bks = [integer_to_binary(N) || N <- lists:seq(0, 2)],
SsCreationFrame = request({create_super_stream, Ss, Partitions, Bks, #{}}),
ok = T:send(S, SsCreationFrame),
{Cmd1, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_OK}},
Cmd1),
PartitionsFrame = request({partitions, Ss}),
ok = T:send(S, PartitionsFrame),
{Cmd2, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {partitions, ?RESPONSE_CODE_OK, Partitions}},
Cmd2),
[begin
RouteFrame = request({route, Rk, Ss}),
ok = T:send(S, RouteFrame),
{Command, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {route, ?RESPONSE_CODE_OK, _}}, Command),
{response, 1, {route, ?RESPONSE_CODE_OK, [P]}} = Command,
?assertEqual(unicode:characters_to_binary([Ss, <<"-">>, Rk]), P)
end || Rk <- Bks],
SsDeletionFrame = request({delete_super_stream, Ss}),
ok = T:send(S, SsDeletionFrame),
{Cmd3, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {delete_super_stream, ?RESPONSE_CODE_OK}},
Cmd3),
ok = T:send(S, PartitionsFrame),
{Cmd4, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {partitions, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, []}},
Cmd4),
%% not the same number of partitions and binding keys
SsCreationBadFrame = request({create_super_stream, Ss,
[<<"s1">>, <<"s2">>], [<<"bk1">>], #{}}),
ok = T:send(S, SsCreationBadFrame),
{Cmd5, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_PRECONDITION_FAILED}},
Cmd5),
test_close(T, S, C),
closed = wait_for_socket_close(T, S, 10),
ok.
test_super_stream_duplicate_partitions(Config) ->
T = gen_tcp,
Port = get_port(T, Config),
Opts = get_opts(T),
{ok, S} = T:connect("localhost", Port, Opts),
C = rabbit_stream_core:init(0),
test_peer_properties(T, S, C),
test_authenticate(T, S, C),
Ss = atom_to_binary(?FUNCTION_NAME, utf8),
Partitions = [<<"same-name">>, <<"same-name">>],
SsCreationFrame = request({create_super_stream, Ss, Partitions, [<<"1">>, <<"2">>], #{}}),
ok = T:send(S, SsCreationFrame),
{Cmd1, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_PRECONDITION_FAILED}},
Cmd1),
test_close(T, S, C),
closed = wait_for_socket_close(T, S, 10),
ok.
test_metadata(Config) ->
Transports = [gen_tcp, ssl],
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
lists:foreach(
fun(Transport) ->
TransportBin = atom_to_binary(Transport, utf8),
Stream = <<FunctionName/binary, <<"_">>/binary, TransportBin/binary>>,
Port = get_port(Transport, Config),
Opts = get_opts(Transport),
FirstNode = get_node_name(Config, 0),
NodeInMaintenance = get_node_name(Config, 1),
{ok, S} = Transport:connect("localhost", Port, Opts),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
C2 = test_authenticate(Transport, S, C1),
C3 = test_create_stream(Transport, S, Stream, C2),
GetStreamNodes =
fun() ->
MetadataFrame = request({metadata, [Stream]}),
ok = Transport:send(S, MetadataFrame),
{CmdMetadata, _} = receive_commands(Transport, S, C3),
{response, 1,
{metadata, _Nodes, #{Stream := {Leader = {_H, _P}, Replicas}}}} =
CmdMetadata,
[Leader | Replicas]
end,
await_condition(fun() -> length(GetStreamNodes()) == 3 end),
rpc(Config, NodeInMaintenance, rabbit_maintenance, drain, []),
IsBeingDrained =
fun() ->
rpc(Config, FirstNode,
rabbit_maintenance, is_being_drained_consistent_read,
[NodeInMaintenance])
end,
await_condition(fun() -> IsBeingDrained() end),
await_condition(fun() -> length(GetStreamNodes()) == 2 end),
rpc(Config, NodeInMaintenance, rabbit_maintenance, revive, []),
await_condition(fun() -> IsBeingDrained() =:= false end),
await_condition(fun() -> length(GetStreamNodes()) == 3 end),
DeleteStreamFrame = request({delete_stream, Stream}),
ok = Transport:send(S, DeleteStreamFrame),
{CmdDelete, C4} = receive_commands(Transport, S, C3),
?assertMatch({response, 1, {delete_stream, ?RESPONSE_CODE_OK}},
CmdDelete),
_C5 = test_close(Transport, S, C4),
closed = wait_for_socket_close(Transport, S, 10)
end, Transports),
ok.
test_gc_consumers(Config) ->
Pid = spawn(fun() -> ok end),
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_metrics,
consumer_created,
[Pid,
#resource{name = <<"test">>,
kind = queue,
virtual_host = <<"/">>},
0,
10,
0,
0,
0,
true,
#{},
<<"guest">>]),
?awaitMatch(0, consumer_count(Config), ?WAIT),
ok.
test_gc_publishers(Config) ->
Pid = spawn(fun() -> ok end),
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_metrics,
publisher_created,
[Pid,
#resource{name = <<"test">>,
kind = queue,
virtual_host = <<"/">>},
0,
<<"ref">>]),
?awaitMatch(0, publisher_count(Config), ?WAIT),
ok.
unauthenticated_client_rejected_tcp_connected(Config) ->
Port = get_stream_port(Config),
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
?assertEqual(ok, gen_tcp:send(S, <<"invalid data">>)),
?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)).
timeout_tcp_connected(Config) ->
Port = get_stream_port(Config),
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)).
unauthenticated_client_rejected_peer_properties_exchanged(Config) ->
Port = get_stream_port(Config),
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0),
test_peer_properties(gen_tcp, S, C0),
?assertEqual(ok, gen_tcp:send(S, <<"invalid data">>)),
?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)).
timeout_peer_properties_exchanged(Config) ->
Port = get_stream_port(Config),
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0),
test_peer_properties(gen_tcp, S, C0),
?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)).
unauthenticated_client_rejected_authenticating(Config) ->
Port = get_stream_port(Config),
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0),
test_peer_properties(gen_tcp, S, C0),
SaslHandshakeFrame = request(sasl_handshake),
?assertEqual(ok, gen_tcp:send(S, SaslHandshakeFrame)),
?awaitMatch({error, closed}, gen_tcp:send(S, <<"invalid data">>),
?WAIT).
timeout_authenticating(Config) ->
Port = get_stream_port(Config),
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0),
test_peer_properties(gen_tcp, S, C0),
_Frame = request(sasl_handshake),
?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)).
timeout_close_sent(Config) ->
Port = get_stream_port(Config),
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(gen_tcp, S, C0),
C2 = test_authenticate(gen_tcp, S, C1),
% Trigger rabbit_stream_reader to transition to state close_sent
NonExistentCommand = 999,
IOData = <<?REQUEST:1, NonExistentCommand:15, ?VERSION_1:16>>,
Size = iolist_size(IOData),
Frame = [<<Size:32>> | IOData],
ok = gen_tcp:send(S, Frame),
{{request, _CorrelationID,
{close, ?RESPONSE_CODE_UNKNOWN_FRAME, <<"unknown frame">>}},
_Config} =
receive_commands(gen_tcp, S, C2),
% Now, rabbit_stream_reader is in state close_sent.
?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)).
max_segment_size_bytes_validation(Config) ->
Transport = gen_tcp,
Port = get_stream_port(Config),
{ok, S} = Transport:connect("localhost", Port,
[{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
C2 = test_authenticate(Transport, S, C1),
Stream = <<"stream-max-segment-size">>,
CreateStreamFrame = request({create_stream, Stream,
#{<<"stream-max-segment-size-bytes">> =>
<<"3000000001">>}}),
ok = Transport:send(S, CreateStreamFrame),
{Cmd, C3} = receive_commands(Transport, S, C2),
?assertMatch({response, 1,
{create_stream, ?RESPONSE_CODE_PRECONDITION_FAILED}},
Cmd),
test_close(Transport, S, C3),
ok.
close_connection_on_consumer_update_timeout(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
{ok, S, C0} = stream_test_utils:connect(Config, 0),
{ok, C1} = stream_test_utils:create_stream(S, C0, Stream),
SubId = 42,
Props = #{<<"single-active-consumer">> => <<"true">>,
<<"name">> => <<"foo">>},
{ok, C2} = stream_test_utils:subscribe(S, C1, Stream, SubId, 10, Props),
{Cmd, _C3} = receive_commands(S, C2),
?assertMatch({request, _, {consumer_update, SubId, true}}, Cmd),
closed = wait_for_socket_close(S, 10),
{ok, Sb, Cb0} = stream_test_utils:connect(Config, 0),
{ok, Cb1} = stream_test_utils:delete_stream(Sb, Cb0, Stream),
stream_test_utils:close(Sb, Cb1),
closed = wait_for_socket_close(Sb, 10),
ok.
set_filter_size(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
Transport = gen_tcp,
Port = get_stream_port(Config),
Opts = [{active, false}, {mode, binary}],
{ok, S} = Transport:connect("localhost", Port, Opts),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
C2 = test_authenticate(Transport, S, C1),
Tests = [
{128, ?RESPONSE_CODE_OK},
{15, ?RESPONSE_CODE_PRECONDITION_FAILED},
{256, ?RESPONSE_CODE_PRECONDITION_FAILED}
],
C3 = lists:foldl(fun({Size, ExpectedResponseCode}, Conn0) ->
Frame = request({create_stream, Stream,
#{<<"stream-filter-size-bytes">> => integer_to_binary(Size)}}),
ok = Transport:send(S, Frame),
{Cmd, Conn1} = receive_commands(Transport, S, Conn0),
?assertMatch({response, 1, {create_stream, ExpectedResponseCode}}, Cmd),
Conn1
end, C2, Tests),
_ = test_close(Transport, S, C3),
closed = wait_for_socket_close(Transport, S, 10),
ok.
vhost_queue_limit(Config) ->
T = gen_tcp,
Port = get_port(T, Config),
Opts = get_opts(T),
{ok, S} = T:connect("localhost", Port, Opts),
C = rabbit_stream_core:init(0),
test_peer_properties(T, S, C),
test_authenticate(T, S, C),
QueueCount = rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_amqqueue,
count,
[<<"/">>]),
{ok, QueueLimit} = rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_vhost_limit,
queue_limit,
[<<"/">>]),
PartitionCount = QueueLimit - 1 - QueueCount,
Name = atom_to_binary(?FUNCTION_NAME, utf8),
Partitions = [unicode:characters_to_binary([Name, <<"-">>, integer_to_binary(N)]) || N <- lists:seq(0, PartitionCount)],
Bks = [integer_to_binary(N) || N <- lists:seq(0, PartitionCount)],
SsCreationFrame = request({create_super_stream, Name, Partitions, Bks, #{}}),
ok = T:send(S, SsCreationFrame),
{Cmd1, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_OK}},
Cmd1),
SsCreationFrameKo = request({create_super_stream,
<<"exceed-queue-limit">>,
[<<"s1">>, <<"s2">>, <<"s3">>],
[<<"1">>, <<"2">>, <<"3">>], #{}}),
ok = T:send(S, SsCreationFrameKo),
{Cmd2, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_PRECONDITION_FAILED}},
Cmd2),
CreateStreamFrame = request({create_stream, <<"exceed-queue-limit">>, #{}}),
ok = T:send(S, CreateStreamFrame),
{Cmd3, C} = receive_commands(T, S, C),
?assertMatch({response, 1, {create_stream, ?RESPONSE_CODE_PRECONDITION_FAILED}}, Cmd3),
SsDeletionFrame = request({delete_super_stream, Name}),
ok = T:send(S, SsDeletionFrame),
{Cmd4, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {delete_super_stream, ?RESPONSE_CODE_OK}},
Cmd4),
ok = T:send(S, request({create_stream, Name, #{}})),
{Cmd5, C} = receive_commands(T, S, C),
?assertMatch({response, 1, {create_stream, ?RESPONSE_CODE_OK}}, Cmd5),
ok = T:send(S, request({delete_stream, Name})),
{Cmd6, C} = receive_commands(T, S, C),
?assertMatch({response, 1, {delete_stream, ?RESPONSE_CODE_OK}}, Cmd6),
ok.
connection_should_be_closed_on_token_expiry(Config) ->
rabbit_ct_broker_helpers:setup_meck(Config),
Mod = rabbit_access_control,
ok = rpc(Config, 0, meck, new, [Mod, [no_link, passthrough]]),
ok = rpc(Config, 0, meck, expect, [Mod, check_user_loopback, 2, ok]),
ok = rpc(Config, 0, meck, expect, [Mod, check_vhost_access, 4, ok]),
ok = rpc(Config, 0, meck, expect, [Mod, permission_cache_can_expire, 1, true]),
Expiry = os:system_time(seconds) + 2,
ok = rpc(Config, 0, meck, expect, [Mod, expiry_timestamp, 1, Expiry]),
T = gen_tcp,
Port = get_port(T, Config),
Opts = get_opts(T),
{ok, S} = T:connect("localhost", Port, Opts),
C = rabbit_stream_core:init(0),
test_peer_properties(T, S, C),
test_authenticate(T, S, C),
closed = wait_for_socket_close(T, S, 10),
ok = rpc(Config, 0, meck, unload, [Mod]).
should_receive_metadata_update_after_update_secret(Config) ->
T = gen_tcp,
Port = get_port(T, Config),
Opts = get_opts(T),
{ok, S} = T:connect("localhost", Port, Opts),
C = rabbit_stream_core:init(0),
test_peer_properties(T, S, C),
test_authenticate(T, S, C),
Prefix = atom_to_binary(?FUNCTION_NAME, utf8),
PublishStream = <<Prefix/binary, <<"-publish">>/binary>>,
test_create_stream(T, S, PublishStream, C),
ConsumeStream = <<Prefix/binary, <<"-consume">>/binary>>,
test_create_stream(T, S, ConsumeStream, C),
test_declare_publisher(T, S, 1, PublishStream, C),
test_subscribe(T, S, 1, ConsumeStream, C),
rabbit_ct_broker_helpers:setup_meck(Config),
Mod = rabbit_stream_utils,
ok = rpc(Config, 0, meck, new, [Mod, [no_link, passthrough]]),
ok = rpc(Config, 0, meck, expect, [Mod, check_write_permitted, 2, error]),
ok = rpc(Config, 0, meck, expect, [Mod, check_read_permitted, 3, error]),
C01 = expect_successful_authentication(try_authenticate(T, S, C, <<"PLAIN">>, <<"guest">>, <<"guest">>)),
{Meta1, C02} = receive_commands(T, S, C01),
{metadata_update, Stream1, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE} = Meta1,
{Meta2, C03} = receive_commands(T, S, C02),
{metadata_update, Stream2, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE} = Meta2,
ImpactedStreams = #{Stream1 => ok, Stream2 => ok},
?assert(maps:is_key(PublishStream, ImpactedStreams)),
?assert(maps:is_key(ConsumeStream, ImpactedStreams)),
test_close(T, S, C03),
closed = wait_for_socket_close(T, S, 10),
ok = rpc(Config, 0, meck, unload, [Mod]),
{ok, S2} = T:connect("localhost", Port, Opts),
C2 = rabbit_stream_core:init(0),
test_peer_properties(T, S2, C2),
test_authenticate(T, S2, C2),
test_delete_stream(T, S2, PublishStream, C2, false),
test_delete_stream(T, S2, ConsumeStream, C2, false),
test_close(T, S2, C2),
closed = wait_for_socket_close(T, S2, 10),
ok.
store_offset_requires_read_access(Config) ->
Username = <<"test">>,
rabbit_ct_broker_helpers:set_full_permissions(Config, Username, <<"/">>),
T = gen_tcp,
Port = get_port(T, Config),
Opts = get_opts(T),
{ok, S} = T:connect("localhost", Port, Opts),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(T, S, C0),
C2 = test_authenticate(T, S, C1, Username),
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
C3 = test_create_stream(T, S, Stream, C2),
C4 = test_subscribe(T, S, 1, Stream, C3),
%% store_offset should work because the subscription is still active
Reference = <<"foo">>,
ok = store_offset(T, S, Reference, Stream, 42),
{O42, C5} = query_expected_offset(T, S, C4, Reference, Stream, 42),
?assertEqual(42, O42),
C6 = test_unsubscribe(T, S, 1, C5),
%% store_offset should still work because the user has read access to the stream
ok = store_offset(T, S, Reference, Stream, 43),
{O43, C7} = query_expected_offset(T, S, C6, Reference, Stream, 43),
?assertEqual(43, O43),
%% no read access anymore
rabbit_ct_broker_helpers:set_permissions(Config, Username, <<"/">>,
<<".*">>, <<".*">>, <<"foobar">>),
%% this store_offset request will not work because no read access
ok = store_offset(T, S, Reference, Stream, 44),
%% providing read access back to be able to query_offset
rabbit_ct_broker_helpers:set_full_permissions(Config, Username, <<"/">>),
%% we never get the offset from the last query_offset attempt
{Timeout, C8} = query_expected_offset(T, S, C7, Reference, Stream, 44),
?assertMatch(timeout, Timeout),
C9 = test_delete_stream(T, S, Stream, C8, true),
test_close(T, S, C9),
closed = wait_for_socket_close(T, S, 10),
ok.
offset_lag_calculation(Config) ->
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
Port = get_port(gen_tcp, Config),
ConnectionName = FunctionName,
{ok, S, C0} = stream_test_utils:connect_pp(Port,
#{<<"connection_name">> => ConnectionName}),
St = FunctionName,
{ok, C1} = stream_test_utils:create_stream(S, C0, St),
SubId = 1,
TheFuture = os:system_time(millisecond) + 60 * 60 * 1_000,
C2 = lists:foldl(
fun(OffsetSpec, C00) ->
{ok, C01} = stream_test_utils:subscribe(S, C00, St, SubId,
10, #{}, OffsetSpec),
ConsumerInfo = consumer_offset_info(Config, ConnectionName),
?assertEqual({0, 0}, ConsumerInfo),
{ok, C02} = stream_test_utils:unsubscribe(S, C01, SubId),
C02
end, C1, [first, last, next, 0, 1_000, {timestamp, TheFuture}]),
PubId = 1,
{ok, C3} = stream_test_utils:declare_publisher(S, C2, St, PubId),
MessageCount = 10,
Body = <<"hello">>,
{ok, C4} = stream_test_utils:publish(S, C3, PubId, 1,
lists:duplicate(MessageCount - 1, Body)),
%% to make sure to have 2 chunks
timer:sleep(200),
{ok, C5} = stream_test_utils:publish(S, C4, PubId, 1, [Body]),
{ok, C6} = stream_test_utils:delete_publisher(S, C5, PubId),
NextOffset = MessageCount,
C7 = lists:foldl(
fun({OffsetSpec, ReceiveDeliver, CheckFun}, C00) ->
{ok, C01} = stream_test_utils:subscribe(S, C00, St, SubId,
1, #{}, OffsetSpec),
C03 = case ReceiveDeliver of
true ->
{{deliver, SubId, _}, C02} = receive_commands(S, C01),
C02;
_ ->
C01
end,
{Offset, Lag} = consumer_offset_info(Config, ConnectionName),
CheckFun(Offset, Lag),
{ok, C04} = stream_test_utils:unsubscribe(S, C03, SubId),
C04
end, C6, [{first, true,
fun(Offset, Lag) ->
?assert(Offset >= 0, "first, at least one chunk consumed"),
?assert(Lag > 0, "first, not all messages consumed")
end},
{last, true,
fun(Offset, _Lag) ->
?assert(Offset > 0, "offset expected for last")
end},
{next, false,
fun(Offset, Lag) ->
?assertEqual(NextOffset, Offset, "next, offset should be at the end of the stream"),
?assert(Lag =:= 0, "next, offset lag should be 0")
end},
{0, true,
fun(Offset, Lag) ->
?assert(Offset >= 0, "offset spec = 0, at least one chunk consumed"),
?assert(Lag > 0, "offset spec = 0, not all messages consumed")
end},
{1_000, false,
fun(Offset, Lag) ->
?assertEqual(NextOffset, Offset, "offset spec = 1000, offset should be at the end of the stream"),
?assert(Lag =:= 0, "offset spec = 1000, offset lag should be 0")
end},
{{timestamp, TheFuture}, false,
fun(Offset, Lag) ->
?assertEqual(NextOffset, Offset, "offset spec in future, offset should be at the end of the stream"),
?assert(Lag =:= 0, "offset spec in future , offset lag should be 0")
end}]),
{ok, C8} = stream_test_utils:delete_stream(S, C7, St),
{ok, _} = stream_test_utils:close(S, C8),
ok.
authentication_error_should_close_with_delay(Config) ->
T = gen_tcp,
Port = get_port(T, Config),
Opts = get_opts(T),
{ok, S} = T:connect("localhost", Port, Opts),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(T, S, C0),
Start = erlang:monotonic_time(millisecond),
_ = expect_unsuccessful_authentication(
try_authenticate(T, S, C1, <<"PLAIN">>, <<"guest">>, <<"wrong password">>),
?RESPONSE_AUTHENTICATION_FAILURE),
End = erlang:monotonic_time(millisecond),
%% the stream reader module defines the delay (3 seconds)
?assert(End - Start > 2_000),
closed = wait_for_socket_close(T, S, 10),
ok.
unauthorized_vhost_access_should_close_with_delay(Config) ->
T = gen_tcp,
Port = get_port(T, Config),
Opts = get_opts(T),
{ok, S} = T:connect("localhost", Port, Opts),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(T, S, C0),
User = <<"other">>,
C2 = test_plain_sasl_authenticate(T, S, sasl_handshake(T, S, C1), User),
Start = erlang:monotonic_time(millisecond),
R = do_tune(T, S, C2),
?assertMatch({{response,_,{open,12}}, _}, R),
End = erlang:monotonic_time(millisecond),
%% the stream reader module defines the delay (3 seconds)
?assert(End - Start > 2_000),
closed = wait_for_socket_close(T, S, 10),
ok.
test_publisher_with_too_long_reference_errors(Config) ->
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
T = gen_tcp,
Port = get_port(T, Config),
Opts = get_opts(T),
{ok, S} = T:connect("localhost", Port, Opts),
C = rabbit_stream_core:init(0),
ConnectionName = FunctionName,
test_peer_properties(T, S, #{<<"connection_name">> => ConnectionName}, C),
test_authenticate(T, S, C),
Stream = FunctionName,
test_create_stream(T, S, Stream, C),
MaxSize = 255,
ReferenceOK = iolist_to_binary(lists:duplicate(MaxSize, <<"a">>)),
ReferenceKO = iolist_to_binary(lists:duplicate(MaxSize + 1, <<"a">>)),
Tests = [{1, ReferenceOK, ?RESPONSE_CODE_OK},
{2, ReferenceKO, ?RESPONSE_CODE_PRECONDITION_FAILED}],
[begin
F = request({declare_publisher, PubId, Ref, Stream}),
ok = T:send(S, F),
{Cmd, C} = receive_commands(T, S, C),
?assertMatch({response, 1, {declare_publisher, ExpectedResponseCode}}, Cmd)
end || {PubId, Ref, ExpectedResponseCode} <- Tests],
test_delete_stream(T, S, Stream, C),
test_close(T, S, C),
ok.
test_consumer_with_too_long_reference_errors(Config) ->
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
T = gen_tcp,
Port = get_port(T, Config),
Opts = get_opts(T),
{ok, S} = T:connect("localhost", Port, Opts),
C = rabbit_stream_core:init(0),
ConnectionName = FunctionName,
test_peer_properties(T, S, #{<<"connection_name">> => ConnectionName}, C),
test_authenticate(T, S, C),
Stream = FunctionName,
test_create_stream(T, S, Stream, C),
MaxSize = 255,
ReferenceOK = iolist_to_binary(lists:duplicate(MaxSize, <<"a">>)),
ReferenceKO = iolist_to_binary(lists:duplicate(MaxSize + 1, <<"a">>)),
Tests = [{1, ReferenceOK, ?RESPONSE_CODE_OK},
{2, ReferenceKO, ?RESPONSE_CODE_PRECONDITION_FAILED}],
[begin
F = request({subscribe, SubId, Stream, first, 1, #{<<"name">> => Ref}}),
ok = T:send(S, F),
{Cmd, C} = receive_commands(T, S, C),
?assertMatch({response, 1, {subscribe, ExpectedResponseCode}}, Cmd)
end || {SubId, Ref, ExpectedResponseCode} <- Tests],
test_delete_stream(T, S, Stream, C),
test_close(T, S, C),
ok.
subscribe_unsubscribe_should_create_events(Config) ->
HandlerMod = rabbit_list_test_event_handler,
rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config, HandlerMod),
rabbit_ct_broker_helpers:rpc(Config, 0,
gen_event,
add_handler,
[rabbit_event, HandlerMod, []]),
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
Transport = gen_tcp,
Port = get_stream_port(Config),
Opts = get_opts(Transport),
{ok, S} = Transport:connect("localhost", Port, Opts),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
C2 = test_authenticate(Transport, S, C1),
C3 = test_create_stream(Transport, S, Stream, C2),
?assertEqual([], filtered_events(Config, consumer_created)),
?assertEqual([], filtered_events(Config, consumer_deleted)),
SubscriptionId = 42,
C4 = test_subscribe(Transport, S, SubscriptionId, Stream, C3),
?awaitMatch([{event, consumer_created, _, _, _}], filtered_events(Config, consumer_created), ?WAIT),
?assertEqual([], filtered_events(Config, consumer_deleted)),
C5 = test_unsubscribe(Transport, S, SubscriptionId, C4),
?awaitMatch([{event, consumer_deleted, _, _, _}], filtered_events(Config, consumer_deleted), ?WAIT),
rabbit_ct_broker_helpers:rpc(Config, 0,
gen_event,
delete_handler,
[rabbit_event, HandlerMod, []]),
C6 = test_delete_stream(Transport, S, Stream, C5, false),
_C7 = test_close(Transport, S, C6),
closed = wait_for_socket_close(Transport, S, 10),
ok.
test_stream_test_utils(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
{ok, S, C0} = stream_test_utils:connect(Config, 0),
{ok, C1} = stream_test_utils:create_stream(S, C0, Stream),
PublisherId = 42,
{ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId),
MsgPerBatch = 100,
Payloads = lists:duplicate(MsgPerBatch, <<"m1">>),
SequenceFrom1 = 1,
{ok, C3} = stream_test_utils:publish(S, C2, PublisherId, SequenceFrom1, Payloads),
{ok, C4} = stream_test_utils:delete_publisher(S, C3, PublisherId),
{ok, C5} = stream_test_utils:delete_stream(S, C4, Stream),
{ok, _} = stream_test_utils:close(S, C5),
ok.
sac_subscription_with_partition_index_conflict_should_return_error(Config) ->
T = gen_tcp,
App = <<"app-1">>,
{ok, S, C0} = stream_test_utils:connect(Config, 0),
Ss = atom_to_binary(?FUNCTION_NAME, utf8),
Partition = unicode:characters_to_binary([Ss, <<"-0">>]),
SsCreationFrame = request({create_super_stream, Ss, [Partition], [<<"0">>], #{}}),
ok = T:send(S, SsCreationFrame),
{Cmd1, C1} = receive_commands(T, S, C0),
?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_OK}},
Cmd1),
SacSubscribeFrame = request({subscribe, 0, Partition,
first, 1,
#{<<"single-active-consumer">> => <<"true">>,
<<"name">> => App}}),
ok = T:send(S, SacSubscribeFrame),
{Cmd2, C2} = receive_commands(T, S, C1),
?assertMatch({response, 1, {subscribe, ?RESPONSE_CODE_OK}},
Cmd2),
{Cmd3, C3} = receive_commands(T, S, C2),
?assertMatch({request,0,{consumer_update,0,true}},
Cmd3),
SsSubscribeFrame = request({subscribe, 1, Partition,
first, 1,
#{<<"super-stream">> => Ss,
<<"single-active-consumer">> => <<"true">>,
<<"name">> => App}}),
ok = T:send(S, SsSubscribeFrame),
{Cmd4, C4} = receive_commands(T, S, C3),
?assertMatch({response, 1, {subscribe, ?RESPONSE_CODE_PRECONDITION_FAILED}},
Cmd4),
{ok, C5} = stream_test_utils:unsubscribe(S, C4, 0),
SsDeletionFrame = request({delete_super_stream, Ss}),
ok = T:send(S, SsDeletionFrame),
{Cmd5, C5} = receive_commands(T, S, C5),
?assertMatch({response, 1, {delete_super_stream, ?RESPONSE_CODE_OK}},
Cmd5),
{ok, _} = stream_test_utils:close(S, C5),
ok.
test_metadata_with_advertised_hints(Config) ->
Transports = [gen_tcp, ssl],
lists:foreach(
fun(Transport) ->
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
TransportBin = atom_to_binary(Transport, utf8),
Stream = <<FunctionName/binary, <<"_">>/binary, TransportBin/binary>>,
Port = get_port(Transport, Config),
Opts = get_opts(Transport),
{ok, S} = Transport:connect("localhost", Port, Opts),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
C2 = test_authenticate(Transport, S, C1),
C3 = test_create_stream(Transport, S, Stream, C2),
GetStreamNodes =
fun(Conn0) ->
MetadataFrame = request({metadata, [Stream]}),
ok = Transport:send(S, MetadataFrame),
{Cmd, Conn1} = receive_commands(Transport, S, Conn0),
{response, 1,
{metadata, _Nodes,
#{Stream := {Node = {_H, _P}, _Replicas}}}} = Cmd,
{Conn1, Node}
end,
{C4, N1} = GetStreamNodes(C3),
?assertEqual({<<"localhost">>, Port}, N1),
AdHost = rand:bytes(20),
AdPort = rand:uniform(65535),
{KH, KP} = case Transport of
gen_tcp ->
{?K_AD_HOST, ?K_AD_PORT};
ssl ->
{?K_AD_TLS_HOST, ?K_AD_TLS_PORT}
end,
rpc(Config, 0, application, set_env, [rabbitmq_stream, KH, AdHost]),
rpc(Config, 0, application, set_env, [rabbitmq_stream, KP, AdPort]),
{C5, N2} = GetStreamNodes(C4),
?assertEqual({AdHost, AdPort}, N2),
rpc(Config, 0, application, set_env, [rabbitmq_stream, KH, undefined]),
rpc(Config, 0, application, set_env, [rabbitmq_stream, KP, undefined]),
_ = test_close(Transport, S, C5),
closed = wait_for_socket_close(Transport, S, 10)
end, Transports),
ok.
test_connection_properties_with_advertised_hints(Config) ->
TestFun =
fun(Transport, ExpectedHost, ExpectedPort) ->
Port = get_port(Transport, Config),
Opts = get_opts(Transport),
{ok, S} = Transport:connect("localhost", Port, Opts),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
{CP, C2} = test_authenticate_with_conn_props(Transport, S, C1),
ExpectedPortBin = integer_to_binary(ExpectedPort),
?assertMatch(#{<<"advertised_host">> := ExpectedHost,
<<"advertised_port">> := ExpectedPortBin},
CP),
_ = test_close(Transport, S, C2),
closed = wait_for_socket_close(Transport, S, 10)
end,
TestFun(gen_tcp, <<"localhost">>, get_port(gen_tcp, Config)),
TestFun(ssl, <<"localhost">>, get_port(ssl, Config)),
lists:foreach(
fun(Transport) ->
AdHost = rand_bin(),
AdPort = rand:uniform(65535),
{KH, KP} = case Transport of
gen_tcp ->
{?K_AD_HOST, ?K_AD_PORT};
ssl ->
{?K_AD_TLS_HOST, ?K_AD_TLS_PORT}
end,
rpc(Config, 0, application, set_env, [rabbitmq_stream, KH, AdHost]),
rpc(Config, 0, application, set_env, [rabbitmq_stream, KP, AdPort]),
TestFun(Transport, AdHost, AdPort),
rpc(Config, 0, application, set_env, [rabbitmq_stream, KH, undefined]),
rpc(Config, 0, application, set_env, [rabbitmq_stream, KP, undefined])
end, [gen_tcp, ssl]),
ok.
filtered_events(Config, EventType) ->
Events = rabbit_ct_broker_helpers:rpc(Config, 0,
gen_event,
call,
[rabbit_event, rabbit_list_test_event_handler, get_events]),
lists:filter(fun({event, Type, _, _, _}) when Type =:= EventType ->
true;
(_) ->
false
end, Events).
consumer_offset_info(Config, ConnectionName) ->
[[{offset, Offset},
{offset_lag, Lag}]] = rpc(Config, 0, ?MODULE,
list_consumer_info, [ConnectionName, [offset, offset_lag]]),
{Offset, Lag}.
list_consumer_info(ConnectionName, Infos) ->
Pids = rabbit_stream:list(<<"/">>),
[ConnPid] = lists:filter(fun(ConnectionPid) ->
ConnectionPid ! {infos, self()},
receive
{ConnectionPid,
#{<<"connection_name">> := ConnectionName}} ->
true;
{ConnectionPid, _ClientProperties} ->
false
after 1000 ->
false
end
end,
Pids),
rabbit_stream_reader:consumers_info(ConnPid, Infos).
store_offset(Transport, S, Reference, Stream, Offset) ->
StoreFrame = rabbit_stream_core:frame({store_offset, Reference, Stream, Offset}),
ok = Transport:send(S, StoreFrame).
query_expected_offset(T, S, C, Reference, Stream, Expected) ->
query_expected_offset(T, S, C, Reference, Stream, Expected, 10).
query_expected_offset(_, _, C, _, _, _, 0) ->
{timeout, C};
query_expected_offset(T, S, C0, Reference, Stream, Expected, Count) ->
case query_offset(T, S, C0, Reference, Stream) of
{Expected, _} = R ->
R;
{_, C1} ->
timer:sleep(100),
query_expected_offset(T, S, C1, Reference, Stream, Expected, Count - 1)
end.
query_offset(T, S, C0, Reference, Stream) ->
QueryFrame = request({query_offset, Reference, Stream}),
ok = T:send(S, QueryFrame),
{Cmd, C1} = receive_commands(T, S, C0),
{response, 1, {query_offset, _, Offset}} = Cmd,
{Offset, C1}.
consumer_count(Config) ->
ets_count(Config, ?TABLE_CONSUMER).
publisher_count(Config) ->
ets_count(Config, ?TABLE_PUBLISHER).
ets_count(Config, Table) ->
Info = rabbit_ct_broker_helpers:rpc(Config, 0, ets, info, [Table]),
rabbit_misc:pget(size, Info).
java(Config) ->
StreamPortNode1 = get_stream_port(Config, 0),
StreamPortNode2 = get_stream_port(Config, 1),
StreamPortTlsNode1 = get_stream_port_tls(Config, 0),
StreamPortTlsNode2 = get_stream_port_tls(Config, 1),
Node1Name = get_node_name(Config, 0),
Node2Name = get_node_name(Config, 1),
RabbitMqCtl = get_rabbitmqctl(Config),
DataDir = rabbit_ct_helpers:get_config(Config, data_dir),
MakeResult =
rabbit_ct_helpers:make(Config, DataDir,
["tests",
{"NODE1_STREAM_PORT=~b", [StreamPortNode1]},
{"NODE1_STREAM_PORT_TLS=~b",
[StreamPortTlsNode1]},
{"NODE1_NAME=~tp", [Node1Name]},
{"NODE2_NAME=~tp", [Node2Name]},
{"NODE2_STREAM_PORT=~b", [StreamPortNode2]},
{"NODE2_STREAM_PORT_TLS=~b",
[StreamPortTlsNode2]},
{"RABBITMQCTL=~tp", [RabbitMqCtl]}]),
{ok, _} = MakeResult.
get_rabbitmqctl(Config) ->
rabbit_ct_helpers:get_config(Config, rabbitmqctl_cmd).
get_stream_port(Config) ->
get_stream_port(Config, 0).
get_stream_port(Config, Node) ->
rabbit_ct_broker_helpers:get_node_config(Config, Node,
tcp_port_stream).
get_stream_port_tls(Config) ->
get_stream_port_tls(Config, 0).
get_stream_port_tls(Config, Node) ->
rabbit_ct_broker_helpers:get_node_config(Config, Node,
tcp_port_stream_tls).
get_node_name(Config) ->
get_node_name(Config, 0).
get_node_name(Config, Node) ->
rabbit_ct_broker_helpers:get_node_config(Config, Node, nodename).
get_port(Transport, Config) ->
case Transport of
gen_tcp ->
get_stream_port(Config);
ssl ->
application:ensure_all_started(ssl),
get_stream_port_tls(Config)
end.
get_opts(Transport) ->
case Transport of
gen_tcp ->
[{active, false}, {mode, binary}];
ssl ->
[{active, false}, {mode, binary}, {verify, verify_none}]
end.
connect_and_authenticate(Transport, Config) ->
Port = get_port(Transport, Config),
Opts = get_opts(Transport),
{ok, S} = Transport:connect("localhost", Port, Opts),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
{S, test_authenticate(Transport, S, C1)}.
try_authenticate(Transport, S, C, AuthMethod, Username, Password) ->
case AuthMethod of
<<"PLAIN">> ->
plain_sasl_authenticate(Transport, S, C, Username, Password);
_ ->
Null = 0,
sasl_authenticate(Transport, S, C, AuthMethod, <<Null:8, Username/binary, Null:8, Password/binary>>)
end.
test_server(Transport, Stream, Config) ->
QName = rabbit_misc:r(<<"/">>, queue, Stream),
Port = get_port(Transport, Config),
Opts = get_opts(Transport),
{ok, S} =
Transport:connect("localhost", Port, Opts),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
C2 = test_authenticate(Transport, S, C1),
C3 = test_create_stream(Transport, S, Stream, C2),
PublisherId = 42,
?assertMatch(#{publishers := 0}, get_global_counters(Config)),
C4 = test_declare_publisher(Transport, S, PublisherId, Stream, C3),
?awaitMatch(#{publishers := 1}, get_global_counters(Config), ?WAIT),
Body = <<"hello">>,
C5 = test_publish_confirm(Transport, S, PublisherId, Body, C4),
C6 = test_publish_confirm(Transport, S, PublisherId, Body, C5),
SubscriptionId = 42,
?assertMatch(#{consumers := 0}, get_global_counters(Config)),
C7 = test_subscribe(Transport, S, SubscriptionId, Stream, C6),
?awaitMatch(#{consumers := 1}, get_global_counters(Config), ?WAIT),
CounterKeys = maps:keys(get_osiris_counters(Config)),
%% find the counter key for the subscriber
{value, SubKey} =
lists:search(fun ({rabbit_stream_reader, Q, Id, _}) ->
Q == QName andalso Id == SubscriptionId;
(_) ->
false
end,
CounterKeys),
C8 = test_deliver(Transport, S, SubscriptionId, 0, Body, C7),
C8b = test_deliver(Transport, S, SubscriptionId, 1, Body, C8),
C9 = test_unsubscribe(Transport, S, SubscriptionId, C8b),
%% assert the counter key got removed after unsubscribe
?assertNot(maps:is_key(SubKey, get_osiris_counters(Config))),
%% exchange capabilities, which says we support deliver v2
%% the connection should adapt its deliver frame accordingly
C10 = test_exchange_command_versions(Transport, S, C9),
SubscriptionId2 = 43,
C11 = test_subscribe(Transport, S, SubscriptionId2, Stream, C10),
C12 = test_deliver_v2(Transport, S, SubscriptionId2, 0, Body, C11),
C13 = test_deliver_v2(Transport, S, SubscriptionId2, 1, Body, C12),
C14 = test_stream_stats(Transport, S, Stream, C13),
C15 = test_delete_stream(Transport, S, Stream, C14),
_C16 = test_close(Transport, S, C15),
closed = wait_for_socket_close(Transport, S, 10),
ok.
test_peer_properties(Transport, S, C0) ->
test_peer_properties(Transport, S, #{}, C0).
test_peer_properties(Transport, S, Properties, C0) ->
PeerPropertiesFrame = request({peer_properties, Properties}),
ok = Transport:send(S, PeerPropertiesFrame),
{Cmd, C} = receive_commands(Transport, S, C0),
?assertMatch({response, 1, {peer_properties, ?RESPONSE_CODE_OK, _}},
Cmd),
C.
test_authenticate(Transport, S, C0) ->
tune(Transport, S,
test_plain_sasl_authenticate(Transport, S, sasl_handshake(Transport, S, C0), <<"guest">>)).
test_authenticate_with_conn_props(Transport, S, C0) ->
tune_with_conn_props(
Transport, S,
test_plain_sasl_authenticate(Transport, S, sasl_handshake(Transport, S, C0), <<"guest">>)).
test_authenticate(Transport, S, C0, Username) ->
test_authenticate(Transport, S, C0, Username, Username).
test_authenticate(Transport, S, C0, Username, Password) ->
tune(Transport, S,
test_plain_sasl_authenticate(Transport, S, sasl_handshake(Transport, S, C0), Username, Password)).
sasl_handshake(Transport, S, C0) ->
SaslHandshakeFrame = request(sasl_handshake),
ok = Transport:send(S, SaslHandshakeFrame),
{Cmd, C1} = receive_commands(Transport, S, C0),
case Cmd of
{response, _, {sasl_handshake, ?RESPONSE_CODE_OK, Mechanisms}} ->
?assertEqual([<<"AMQPLAIN">>, <<"ANONYMOUS">>, <<"PLAIN">>],
lists:sort(Mechanisms));
_ ->
ct:fail("invalid cmd ~tp", [Cmd])
end,
C1.
test_anonymous_sasl_authenticate(Transport, S, C) ->
Res = sasl_authenticate(Transport, S, C, <<"ANONYMOUS">>, <<>>),
expect_successful_authentication(Res).
test_plain_sasl_authenticate(Transport, S, C1, Username) ->
test_plain_sasl_authenticate(Transport, S, C1, Username, Username).
test_plain_sasl_authenticate(Transport, S, C1, Username, Password) ->
expect_successful_authentication(plain_sasl_authenticate(Transport, S, C1, Username, Password)).
plain_sasl_authenticate(Transport, S, C1, Username, Password) ->
Null = 0,
sasl_authenticate(Transport, S, C1, <<"PLAIN">>, <<Null:8, Username/binary, Null:8, Password/binary>>).
expect_successful_authentication({SaslAuth, C2} = _SaslReponse) ->
?assertEqual({response, 2, {sasl_authenticate, ?RESPONSE_CODE_OK}},
SaslAuth),
C2.
expect_unsuccessful_authentication({SaslAuth, C2} = _SaslReponse, ExpectedError) ->
?assertEqual({response, 2, {sasl_authenticate, ExpectedError}},
SaslAuth),
C2.
sasl_authenticate(Transport, S, C1, AuthMethod, AuthBody) ->
SaslAuthenticateFrame = request(2, {sasl_authenticate, AuthMethod, AuthBody}),
ok = Transport:send(S, SaslAuthenticateFrame),
receive_commands(Transport, S, C1).
tune(Transport, S, C2) ->
{{response, _, {open, ?RESPONSE_CODE_OK, _}}, C3} = do_tune(Transport, S, C2),
C3.
tune_with_conn_props(Transport, S, C2) ->
{{response, _, {open, ?RESPONSE_CODE_OK, CP}}, C3} = do_tune(Transport, S, C2),
{CP, C3}.
do_tune(Transport, S, C2) ->
{Tune, C3} = receive_commands(Transport, S, C2),
{tune, ?DEFAULT_FRAME_MAX, ?DEFAULT_HEARTBEAT} = Tune,
TuneFrame =
rabbit_stream_core:frame({response, 0,
{tune, ?DEFAULT_FRAME_MAX, 0}}),
ok = Transport:send(S, TuneFrame),
VirtualHost = <<"/">>,
OpenFrame = request(3, {open, VirtualHost}),
ok = Transport:send(S, OpenFrame),
receive_commands(Transport, S, C3).
test_create_stream(Transport, S, Stream, C0) ->
CreateStreamFrame = request({create_stream, Stream, #{}}),
ok = Transport:send(S, CreateStreamFrame),
{Cmd, C} = receive_commands(Transport, S, C0),
?assertMatch({response, 1, {create_stream, ?RESPONSE_CODE_OK}}, Cmd),
C.
test_delete_stream(Transport, S, Stream, C0) ->
test_delete_stream(Transport, S, Stream, C0, true).
test_delete_stream(Transport, S, Stream, C0, false) ->
do_test_delete_stream(Transport, S, Stream, C0);
test_delete_stream(Transport, S, Stream, C0, true) ->
C1 = do_test_delete_stream(Transport, S, Stream, C0),
test_metadata_update_stream_deleted(Transport, S, Stream, C1).
do_test_delete_stream(Transport, S, Stream, C0) ->
DeleteStreamFrame = request({delete_stream, Stream}),
ok = Transport:send(S, DeleteStreamFrame),
{Cmd, C1} = receive_commands(Transport, S, C0),
?assertMatch({response, 1, {delete_stream, ?RESPONSE_CODE_OK}}, Cmd),
C1.
test_metadata_update_stream_deleted(Transport, S, Stream, C0) ->
{Meta, C1} = receive_commands(Transport, S, C0),
{metadata_update, Stream, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE} = Meta,
C1.
test_declare_publisher(Transport, S, PublisherId, Stream, C0) ->
test_declare_publisher(Transport, S, PublisherId, <<>>, Stream, C0).
test_declare_publisher(Transport, S, PublisherId, Reference, Stream, C0) ->
DeclarePublisherFrame = request({declare_publisher,
PublisherId,
Reference,
Stream}),
ok = Transport:send(S, DeclarePublisherFrame),
{Cmd, C} = receive_commands(Transport, S, C0),
?assertMatch({response, 1, {declare_publisher, ?RESPONSE_CODE_OK}},
Cmd),
C.
test_publish_confirm(Transport, S, PublisherId, Body, C0) ->
test_publish_confirm(Transport, S, PublisherId, 1, Body, C0).
test_publish_confirm(Transport, S, PublisherId, Sequence, Body, C0) ->
test_publish_confirm(Transport, S, publish, PublisherId, Sequence, Body,
publish_confirm, C0).
test_publish_confirm(Transport, S, PublishCmd, PublisherId, Body,
ExpectedConfirmCommand, C0) ->
test_publish_confirm(Transport, S, PublishCmd, PublisherId, 1, Body,
ExpectedConfirmCommand, C0).
test_publish_confirm(Transport, S, publish = PublishCmd, PublisherId,
Sequence, Body,
ExpectedConfirmCommand, C0) ->
BodySize = byte_size(Body),
Messages = [<<Sequence:64, 0:1, BodySize:31, Body:BodySize/binary>>],
PublishFrame = frame({PublishCmd, PublisherId, 1, Messages}),
ok = Transport:send(S, PublishFrame),
{Cmd, C} = receive_commands(Transport, S, C0),
?assertMatch({ExpectedConfirmCommand, PublisherId, [Sequence]}, Cmd),
C;
test_publish_confirm(Transport, S, publish_v2 = PublishCmd, PublisherId,
Sequence, Body,
ExpectedConfirmCommand, C0) ->
BodySize = byte_size(Body),
FilterValue = <<"foo">>,
FilterValueSize = byte_size(FilterValue),
Messages = [<<Sequence:64, FilterValueSize:16, FilterValue:FilterValueSize/binary,
0:1, BodySize:31, Body:BodySize/binary>>],
PublishFrame = frame({PublishCmd, PublisherId, 1, Messages}),
ok = Transport:send(S, PublishFrame),
{Cmd, C} = receive_commands(Transport, S, C0),
case ExpectedConfirmCommand of
publish_confirm ->
?assertMatch({ExpectedConfirmCommand, PublisherId, [Sequence]}, Cmd);
publish_error ->
?assertMatch({ExpectedConfirmCommand, PublisherId, _, [Sequence]}, Cmd)
end,
C.
test_delete_publisher(Transport, Socket, PublisherId, C0) ->
Frame = request({delete_publisher, PublisherId}),
ok = Transport:send(Socket, Frame),
{Cmd, C} = receive_commands(Transport, Socket, C0),
?assertMatch({response, 1, {delete_publisher, ?RESPONSE_CODE_OK}}, Cmd),
C.
test_subscribe(Transport, S, SubscriptionId, Stream, C0) ->
test_subscribe(Transport,
S,
SubscriptionId,
Stream,
#{<<"random">> => <<"thing">>},
?RESPONSE_CODE_OK,
C0).
test_subscribe(Transport,
S,
SubscriptionId,
Stream,
SubscriptionProperties,
ExpectedResponseCode,
C0) ->
test_subscribe(Transport, S, SubscriptionId, Stream, 0, 10,
SubscriptionProperties,
ExpectedResponseCode, C0).
test_subscribe(Transport,
S,
SubscriptionId,
Stream,
OffsetSpec,
Credit,
SubscriptionProperties,
ExpectedResponseCode,
C0) ->
SubscribeFrame = request({subscribe, SubscriptionId, Stream,
OffsetSpec, Credit, SubscriptionProperties}),
ok = Transport:send(S, SubscribeFrame),
{Cmd, C} = receive_commands(Transport, S, C0),
?assertMatch({response, 1, {subscribe, ExpectedResponseCode}}, Cmd),
C.
test_unsubscribe(Transport, Socket, SubscriptionId, C0) ->
UnsubscribeFrame = request({unsubscribe, SubscriptionId}),
ok = Transport:send(Socket, UnsubscribeFrame),
{Cmd, C} = receive_commands(Transport, Socket, C0),
?assertMatch({response, 1, {unsubscribe, ?RESPONSE_CODE_OK}}, Cmd),
C.
test_deliver(Transport, S, SubscriptionId, COffset, Body, C0) ->
{{deliver, SubscriptionId, Chunk}, C} =
receive_commands(Transport, S, C0),
ct:pal("test_deliver ~p", [Chunk]),
<<5:4/unsigned,
0:4/unsigned,
0:8,
1:16,
1:32,
_Timestamp:64,
_Epoch:64,
COffset:64,
_Crc:32,
_DataLength:32,
_TrailerLength:32,
_ReservedBytes:32,
0:1,
BodySize:31/unsigned,
Body:BodySize/binary>> =
Chunk,
C.
test_deliver_v2(Transport, S, SubscriptionId, COffset, Body, C0) ->
{{deliver_v2, SubscriptionId, _CommittedOffset, Chunk}, C} =
receive_commands(Transport, S, C0),
ct:pal("test_deliver_v2 ~p", [Chunk]),
<<5:4/unsigned,
0:4/unsigned,
0:8,
1:16,
1:32,
_Timestamp:64,
_Epoch:64,
COffset:64,
_Crc:32,
_DataLength:32,
_TrailerLength:32,
_ReservedBytes:32,
0:1,
BodySize:31/unsigned,
Body:BodySize/binary>> =
Chunk,
C.
test_exchange_command_versions(Transport, S, C0) ->
ExFrame = request({exchange_command_versions, [{deliver, ?VERSION_1, ?VERSION_2}]}),
ok = Transport:send(S, ExFrame),
{Cmd, C} = receive_commands(Transport, S, C0),
?assertMatch({response, 1,
{exchange_command_versions, ?RESPONSE_CODE_OK,
[{declare_publisher, _, _} | _]}},
Cmd),
C.
test_stream_stats(Transport, S, Stream, C0) ->
SIFrame = request({stream_stats, Stream}),
ok = Transport:send(S, SIFrame),
{Cmd, C} = receive_commands(Transport, S, C0),
?assertMatch({response, 1,
{stream_stats, ?RESPONSE_CODE_OK,
#{<<"first_chunk_id">> := 0,
<<"committed_chunk_id">> := 1}}},
Cmd),
C.
test_close(Transport, S, C0) ->
CloseReason = <<"OK">>,
CloseFrame = request({close, ?RESPONSE_CODE_OK, CloseReason}),
ok = Transport:send(S, CloseFrame),
{{response, 1, {close, ?RESPONSE_CODE_OK}}, C} =
receive_commands(Transport, S, C0),
C.
wait_for_socket_close(S, Attempt) ->
wait_for_socket_close(gen_tcp, S, Attempt).
wait_for_socket_close(_Transport, _S, 0) ->
not_closed;
wait_for_socket_close(Transport, S, Attempt) ->
case Transport:recv(S, 0, 1000) of
{error, timeout} ->
wait_for_socket_close(Transport, S, Attempt - 1);
{error, closed} ->
closed
end.
receive_commands(S, C) ->
receive_commands(gen_tcp, S, C).
receive_commands(Transport, S, C) ->
stream_test_utils:receive_stream_commands(Transport, S, C).
get_osiris_counters(Config) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
osiris_counters,
overview,
[]).
get_global_counters(Config) ->
maps:get(#{protocol => stream},
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_global_counters,
overview,
[])).
request(Cmd) ->
request(1, Cmd).
request(CorrId, Cmd) ->
rabbit_stream_core:frame({request, CorrId, Cmd}).
rand_bin() ->
base64:encode(rand:bytes(20)).
generate_log(MsgSize, MsgsPerChunk, NumMessages, Directory) ->
Body = binary:copy(<<"a">>, MsgSize),
Data = #'v1_0.data'{content = Body},
Bin = amqp10_framing:encode_bin(Data),
osiris_log:generate_log(Bin, MsgsPerChunk, NumMessages, Directory).