Merge pull request #13119 from rabbitmq/stream-test-utils-fix-wait-for-confirms
Fix wait-for-confirms sequence in stream test utils
This commit is contained in:
commit
a51d8a5ec9
|
|
@ -40,24 +40,49 @@ connect(Config, Node) ->
|
||||||
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4),
|
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4),
|
||||||
{ok, Sock, C5}.
|
{ok, Sock, C5}.
|
||||||
|
|
||||||
|
close(Sock, C0) ->
|
||||||
|
CloseReason = <<"OK">>,
|
||||||
|
CloseFrame = rabbit_stream_core:frame({request, 1, {close, ?RESPONSE_CODE_OK, CloseReason}}),
|
||||||
|
ok = gen_tcp:send(Sock, CloseFrame),
|
||||||
|
{{response, 1, {close, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
|
||||||
|
{ok, C1}.
|
||||||
|
|
||||||
create_stream(Sock, C0, Stream) ->
|
create_stream(Sock, C0, Stream) ->
|
||||||
CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}),
|
CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}),
|
||||||
ok = gen_tcp:send(Sock, CreateStreamFrame),
|
ok = gen_tcp:send(Sock, CreateStreamFrame),
|
||||||
{{response, 1, {create_stream, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
|
{{response, 1, {create_stream, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
|
||||||
{ok, C1}.
|
{ok, C1}.
|
||||||
|
|
||||||
|
delete_stream(Sock, C0, Stream) ->
|
||||||
|
DeleteStreamFrame = rabbit_stream_core:frame({request, 1, {delete_stream, Stream}}),
|
||||||
|
ok = gen_tcp:send(Sock, DeleteStreamFrame),
|
||||||
|
{{response, 1, {delete_stream, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
|
||||||
|
{ok, C1}.
|
||||||
|
|
||||||
declare_publisher(Sock, C0, Stream, PublisherId) ->
|
declare_publisher(Sock, C0, Stream, PublisherId) ->
|
||||||
DeclarePublisherFrame = rabbit_stream_core:frame({request, 1, {declare_publisher, PublisherId, <<>>, Stream}}),
|
DeclarePublisherFrame = rabbit_stream_core:frame({request, 1, {declare_publisher, PublisherId, <<>>, Stream}}),
|
||||||
ok = gen_tcp:send(Sock, DeclarePublisherFrame),
|
ok = gen_tcp:send(Sock, DeclarePublisherFrame),
|
||||||
{{response, 1, {declare_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
|
{{response, 1, {declare_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
|
||||||
{ok, C1}.
|
{ok, C1}.
|
||||||
|
|
||||||
|
delete_publisher(Sock, C0, PublisherId) ->
|
||||||
|
DeletePublisherFrame = rabbit_stream_core:frame({request, 1, {delete_publisher, PublisherId}}),
|
||||||
|
ok = gen_tcp:send(Sock, DeletePublisherFrame),
|
||||||
|
{{response, 1, {delete_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
|
||||||
|
{ok, C1}.
|
||||||
|
|
||||||
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) ->
|
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) ->
|
||||||
SubscribeFrame = rabbit_stream_core:frame({request, 1, {subscribe, SubscriptionId, Stream, _OffsetSpec = first, InitialCredit, _Props = #{}}}),
|
SubscribeFrame = rabbit_stream_core:frame({request, 1, {subscribe, SubscriptionId, Stream, _OffsetSpec = first, InitialCredit, _Props = #{}}}),
|
||||||
ok = gen_tcp:send(Sock, SubscribeFrame),
|
ok = gen_tcp:send(Sock, SubscribeFrame),
|
||||||
{{response, 1, {subscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
|
{{response, 1, {subscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
|
||||||
{ok, C1}.
|
{ok, C1}.
|
||||||
|
|
||||||
|
unsubscribe(Sock, C0, SubscriptionId) ->
|
||||||
|
UnsubscribeFrame = rabbit_stream_core:frame({request, 1, {unsubscribe, SubscriptionId}}),
|
||||||
|
ok = gen_tcp:send(Sock, UnsubscribeFrame),
|
||||||
|
{{response, 1, {unsubscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
|
||||||
|
{ok, C1}.
|
||||||
|
|
||||||
publish(Sock, C0, PublisherId, Sequence0, Payloads) ->
|
publish(Sock, C0, PublisherId, Sequence0, Payloads) ->
|
||||||
SeqIds = lists:seq(Sequence0, Sequence0 + length(Payloads) - 1),
|
SeqIds = lists:seq(Sequence0, Sequence0 + length(Payloads) - 1),
|
||||||
Messages = [simple_entry(Seq, P)
|
Messages = [simple_entry(Seq, P)
|
||||||
|
|
@ -68,8 +93,17 @@ publish(Sock, C0, PublisherId, Sequence0, Payloads) ->
|
||||||
publish_entries(Sock, C0, PublisherId, MsgCount, Messages) ->
|
publish_entries(Sock, C0, PublisherId, MsgCount, Messages) ->
|
||||||
PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, MsgCount, Messages}),
|
PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, MsgCount, Messages}),
|
||||||
ok = gen_tcp:send(Sock, PublishFrame1),
|
ok = gen_tcp:send(Sock, PublishFrame1),
|
||||||
{{publish_confirm, PublisherId, SeqIds}, C1} = receive_stream_commands(Sock, C0),
|
wait_for_confirms(Sock, C0, PublisherId, [], MsgCount).
|
||||||
{ok, SeqIds, C1}.
|
|
||||||
|
wait_for_confirms(_, C, _, Acc, 0) ->
|
||||||
|
{ok, Acc, C};
|
||||||
|
wait_for_confirms(S, C0, PublisherId, Acc, Remaining) ->
|
||||||
|
case receive_stream_commands(S, C0) of
|
||||||
|
{{publish_confirm, PublisherId, SeqIds}, C1} ->
|
||||||
|
wait_for_confirms(S, C1, PublisherId, Acc ++ SeqIds, Remaining - length(SeqIds));
|
||||||
|
{Frame, C1} ->
|
||||||
|
{unexpected_frame, Frame, C1}
|
||||||
|
end.
|
||||||
|
|
||||||
%% Streams contain AMQP 1.0 encoded messages.
|
%% Streams contain AMQP 1.0 encoded messages.
|
||||||
%% In this case, the AMQP 1.0 encoded message contains a single data section.
|
%% In this case, the AMQP 1.0 encoded message contains a single data section.
|
||||||
|
|
|
||||||
|
|
@ -744,10 +744,10 @@ exchange_names_metric(Config) ->
|
||||||
stream_pub_sub_metrics(Config) ->
|
stream_pub_sub_metrics(Config) ->
|
||||||
Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1",
|
Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1",
|
||||||
MsgPerBatch1 = 2,
|
MsgPerBatch1 = 2,
|
||||||
publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config),
|
{ok, S1, C1} = publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config),
|
||||||
Stream2 = atom_to_list(?FUNCTION_NAME) ++ "2",
|
Stream2 = atom_to_list(?FUNCTION_NAME) ++ "2",
|
||||||
MsgPerBatch2 = 3,
|
MsgPerBatch2 = 3,
|
||||||
publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config),
|
{ok, S2, C2} = publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config),
|
||||||
|
|
||||||
%% aggregated metrics
|
%% aggregated metrics
|
||||||
|
|
||||||
|
|
@ -770,6 +770,8 @@ stream_pub_sub_metrics(Config) ->
|
||||||
?assertEqual([{#{vhost => "/", queue => Stream1}, [2]},
|
?assertEqual([{#{vhost => "/", queue => Stream1}, [2]},
|
||||||
{#{vhost => "/", queue => Stream2}, [3]}],
|
{#{vhost => "/", queue => Stream2}, [3]}],
|
||||||
lists:sort(maps:to_list(MaxOffsetLag))),
|
lists:sort(maps:to_list(MaxOffsetLag))),
|
||||||
|
dispose_stream_connection(S1, C1, list_to_binary(Stream1)),
|
||||||
|
dispose_stream_connection(S2, C2, list_to_binary(Stream2)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
core_metrics_special_chars(Config) ->
|
core_metrics_special_chars(Config) ->
|
||||||
|
|
@ -839,8 +841,13 @@ publish_via_stream_protocol(Stream, MsgPerBatch, Config) ->
|
||||||
SubscriptionId = 97,
|
SubscriptionId = 97,
|
||||||
{ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 1),
|
{ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 1),
|
||||||
%% delivery of first batch of messages
|
%% delivery of first batch of messages
|
||||||
{{deliver, SubscriptionId, _Bin1}, _C7} = stream_test_utils:receive_stream_commands(S, C6),
|
{{deliver, SubscriptionId, _Bin1}, C7} = stream_test_utils:receive_stream_commands(S, C6),
|
||||||
ok.
|
{ok, S, C7}.
|
||||||
|
|
||||||
|
dispose_stream_connection(Sock, C0, Stream) ->
|
||||||
|
{ok, C1} = stream_test_utils:delete_stream(Sock, C0, Stream),
|
||||||
|
{_MetadataUpdateFrame, C2} = stream_test_utils:receive_stream_commands(Sock, C1),
|
||||||
|
{ok, _} = stream_test_utils:close(Sock, C2).
|
||||||
|
|
||||||
http_get(Config, ReqHeaders, CodeExp) ->
|
http_get(Config, ReqHeaders, CodeExp) ->
|
||||||
Path = proplists:get_value(prometheus_path, Config, "/metrics"),
|
Path = proplists:get_value(prometheus_path, Config, "/metrics"),
|
||||||
|
|
|
||||||
|
|
@ -339,15 +339,16 @@ publish_via_stream_protocol(Stream, Config) ->
|
||||||
{ok, _, C3} = stream_test_utils:publish_entries(S, C2, PublisherId, length(Messages1), Messages1),
|
{ok, _, C3} = stream_test_utils:publish_entries(S, C2, PublisherId, length(Messages1), Messages1),
|
||||||
|
|
||||||
UncompressedSubbatch = stream_test_utils:sub_batch_entry_uncompressed(4, [<<"m4">>, <<"m5">>, <<"m6">>]),
|
UncompressedSubbatch = stream_test_utils:sub_batch_entry_uncompressed(4, [<<"m4">>, <<"m5">>, <<"m6">>]),
|
||||||
{ok, _, C4} = stream_test_utils:publish_entries(S, C3, PublisherId, 3, UncompressedSubbatch),
|
{ok, _, C4} = stream_test_utils:publish_entries(S, C3, PublisherId, 1, UncompressedSubbatch),
|
||||||
|
|
||||||
CompressedSubbatch = stream_test_utils:sub_batch_entry_compressed(5, [<<"m7">>, <<"m8">>, <<"m9">>]),
|
CompressedSubbatch = stream_test_utils:sub_batch_entry_compressed(5, [<<"m7">>, <<"m8">>, <<"m9">>]),
|
||||||
{ok, _, C5} = stream_test_utils:publish_entries(S, C4, PublisherId, 3, CompressedSubbatch),
|
{ok, _, C5} = stream_test_utils:publish_entries(S, C4, PublisherId, 1, CompressedSubbatch),
|
||||||
|
|
||||||
M10 = stream_test_utils:simple_entry(6, <<"m10">>),
|
M10 = stream_test_utils:simple_entry(6, <<"m10">>),
|
||||||
M11 = stream_test_utils:simple_entry(7, <<"m11">>),
|
M11 = stream_test_utils:simple_entry(7, <<"m11">>),
|
||||||
Messages2 = [M10, M11],
|
Messages2 = [M10, M11],
|
||||||
{ok, _, _C6} = stream_test_utils:publish_entries(S, C5, PublisherId, length(Messages2), Messages2).
|
{ok, _, C6} = stream_test_utils:publish_entries(S, C5, PublisherId, length(Messages2), Messages2),
|
||||||
|
{ok, _} = stream_test_utils:close(S, C6).
|
||||||
|
|
||||||
connection_config(Config) ->
|
connection_config(Config) ->
|
||||||
Host = ?config(rmq_hostname, Config),
|
Host = ?config(rmq_hostname, Config),
|
||||||
|
|
|
||||||
|
|
@ -67,7 +67,8 @@ groups() ->
|
||||||
sasl_anonymous,
|
sasl_anonymous,
|
||||||
test_publisher_with_too_long_reference_errors,
|
test_publisher_with_too_long_reference_errors,
|
||||||
test_consumer_with_too_long_reference_errors,
|
test_consumer_with_too_long_reference_errors,
|
||||||
subscribe_unsubscribe_should_create_events
|
subscribe_unsubscribe_should_create_events,
|
||||||
|
test_stream_test_utils
|
||||||
]},
|
]},
|
||||||
%% Run `test_global_counters` on its own so the global metrics are
|
%% Run `test_global_counters` on its own so the global metrics are
|
||||||
%% initialised to 0 for each testcase
|
%% initialised to 0 for each testcase
|
||||||
|
|
@ -1053,6 +1054,21 @@ subscribe_unsubscribe_should_create_events(Config) ->
|
||||||
closed = wait_for_socket_close(Transport, S, 10),
|
closed = wait_for_socket_close(Transport, S, 10),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
test_stream_test_utils(Config) ->
|
||||||
|
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
|
||||||
|
{ok, S, C0} = stream_test_utils:connect(Config, 0),
|
||||||
|
{ok, C1} = stream_test_utils:create_stream(S, C0, Stream),
|
||||||
|
PublisherId = 42,
|
||||||
|
{ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId),
|
||||||
|
MsgPerBatch = 100,
|
||||||
|
Payloads = lists:duplicate(MsgPerBatch, <<"m1">>),
|
||||||
|
SequenceFrom1 = 1,
|
||||||
|
{ok, C3} = stream_test_utils:publish(S, C2, PublisherId, SequenceFrom1, Payloads),
|
||||||
|
{ok, C4} = stream_test_utils:delete_publisher(S, C3, PublisherId),
|
||||||
|
{ok, C5} = stream_test_utils:delete_stream(S, C4, Stream),
|
||||||
|
{ok, _} = stream_test_utils:close(S, C5),
|
||||||
|
ok.
|
||||||
|
|
||||||
filtered_events(Config, EventType) ->
|
filtered_events(Config, EventType) ->
|
||||||
Events = rabbit_ct_broker_helpers:rpc(Config, 0,
|
Events = rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||||
gen_event,
|
gen_event,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue