Fix wait-for-confirms sequence in stream test utils
And refine the implementation and its usage.
This commit is contained in:
parent
c5edd60c66
commit
b3b0940024
|
@ -40,24 +40,49 @@ connect(Config, Node) ->
|
|||
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4),
|
||||
{ok, Sock, C5}.
|
||||
|
||||
close(Sock, C0) ->
|
||||
CloseReason = <<"OK">>,
|
||||
CloseFrame = rabbit_stream_core:frame({request, 1, {close, ?RESPONSE_CODE_OK, CloseReason}}),
|
||||
ok = gen_tcp:send(Sock, CloseFrame),
|
||||
{{response, 1, {close, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
|
||||
{ok, C1}.
|
||||
|
||||
create_stream(Sock, C0, Stream) ->
|
||||
CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}),
|
||||
ok = gen_tcp:send(Sock, CreateStreamFrame),
|
||||
{{response, 1, {create_stream, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
|
||||
{ok, C1}.
|
||||
|
||||
delete_stream(Sock, C0, Stream) ->
|
||||
DeleteStreamFrame = rabbit_stream_core:frame({request, 1, {delete_stream, Stream}}),
|
||||
ok = gen_tcp:send(Sock, DeleteStreamFrame),
|
||||
{{response, 1, {delete_stream, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
|
||||
{ok, C1}.
|
||||
|
||||
declare_publisher(Sock, C0, Stream, PublisherId) ->
|
||||
DeclarePublisherFrame = rabbit_stream_core:frame({request, 1, {declare_publisher, PublisherId, <<>>, Stream}}),
|
||||
ok = gen_tcp:send(Sock, DeclarePublisherFrame),
|
||||
{{response, 1, {declare_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
|
||||
{ok, C1}.
|
||||
|
||||
delete_publisher(Sock, C0, PublisherId) ->
|
||||
DeletePublisherFrame = rabbit_stream_core:frame({request, 1, {delete_publisher, PublisherId}}),
|
||||
ok = gen_tcp:send(Sock, DeletePublisherFrame),
|
||||
{{response, 1, {delete_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
|
||||
{ok, C1}.
|
||||
|
||||
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) ->
|
||||
SubscribeFrame = rabbit_stream_core:frame({request, 1, {subscribe, SubscriptionId, Stream, _OffsetSpec = first, InitialCredit, _Props = #{}}}),
|
||||
ok = gen_tcp:send(Sock, SubscribeFrame),
|
||||
{{response, 1, {subscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
|
||||
{ok, C1}.
|
||||
|
||||
unsubscribe(Sock, C0, SubscriptionId) ->
|
||||
UnsubscribeFrame = rabbit_stream_core:frame({request, 1, {unsubscribe, SubscriptionId}}),
|
||||
ok = gen_tcp:send(Sock, UnsubscribeFrame),
|
||||
{{response, 1, {unsubscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
|
||||
{ok, C1}.
|
||||
|
||||
publish(Sock, C0, PublisherId, Sequence0, Payloads) ->
|
||||
SeqIds = lists:seq(Sequence0, Sequence0 + length(Payloads) - 1),
|
||||
Messages = [simple_entry(Seq, P)
|
||||
|
@ -68,8 +93,17 @@ publish(Sock, C0, PublisherId, Sequence0, Payloads) ->
|
|||
publish_entries(Sock, C0, PublisherId, MsgCount, Messages) ->
|
||||
PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, MsgCount, Messages}),
|
||||
ok = gen_tcp:send(Sock, PublishFrame1),
|
||||
{{publish_confirm, PublisherId, SeqIds}, C1} = receive_stream_commands(Sock, C0),
|
||||
{ok, SeqIds, C1}.
|
||||
wait_for_confirms(Sock, C0, PublisherId, [], MsgCount).
|
||||
|
||||
wait_for_confirms(_, C, _, Acc, 0) ->
|
||||
{ok, Acc, C};
|
||||
wait_for_confirms(S, C0, PublisherId, Acc, Remaining) ->
|
||||
case receive_stream_commands(S, C0) of
|
||||
{{publish_confirm, PublisherId, SeqIds}, C1} ->
|
||||
wait_for_confirms(S, C1, PublisherId, Acc ++ SeqIds, Remaining - length(SeqIds));
|
||||
{Frame, C1} ->
|
||||
{unexpected_frame, Frame, C1}
|
||||
end.
|
||||
|
||||
%% Streams contain AMQP 1.0 encoded messages.
|
||||
%% In this case, the AMQP 1.0 encoded message contains a single data section.
|
||||
|
|
|
@ -744,10 +744,10 @@ exchange_names_metric(Config) ->
|
|||
stream_pub_sub_metrics(Config) ->
|
||||
Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1",
|
||||
MsgPerBatch1 = 2,
|
||||
publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config),
|
||||
{ok, S1, C1} = publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config),
|
||||
Stream2 = atom_to_list(?FUNCTION_NAME) ++ "2",
|
||||
MsgPerBatch2 = 3,
|
||||
publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config),
|
||||
{ok, S2, C2} = publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config),
|
||||
|
||||
%% aggregated metrics
|
||||
|
||||
|
@ -770,6 +770,8 @@ stream_pub_sub_metrics(Config) ->
|
|||
?assertEqual([{#{vhost => "/", queue => Stream1}, [2]},
|
||||
{#{vhost => "/", queue => Stream2}, [3]}],
|
||||
lists:sort(maps:to_list(MaxOffsetLag))),
|
||||
dispose_stream_connection(S1, C1, list_to_binary(Stream1)),
|
||||
dispose_stream_connection(S2, C2, list_to_binary(Stream2)),
|
||||
ok.
|
||||
|
||||
core_metrics_special_chars(Config) ->
|
||||
|
@ -839,8 +841,13 @@ publish_via_stream_protocol(Stream, MsgPerBatch, Config) ->
|
|||
SubscriptionId = 97,
|
||||
{ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 1),
|
||||
%% delivery of first batch of messages
|
||||
{{deliver, SubscriptionId, _Bin1}, _C7} = stream_test_utils:receive_stream_commands(S, C6),
|
||||
ok.
|
||||
{{deliver, SubscriptionId, _Bin1}, C7} = stream_test_utils:receive_stream_commands(S, C6),
|
||||
{ok, S, C7}.
|
||||
|
||||
dispose_stream_connection(Sock, C0, Stream) ->
|
||||
{ok, C1} = stream_test_utils:delete_stream(Sock, C0, Stream),
|
||||
{_MetadataUpdateFrame, C2} = stream_test_utils:receive_stream_commands(Sock, C1),
|
||||
{ok, _} = stream_test_utils:close(Sock, C2).
|
||||
|
||||
http_get(Config, ReqHeaders, CodeExp) ->
|
||||
Path = proplists:get_value(prometheus_path, Config, "/metrics"),
|
||||
|
|
|
@ -339,15 +339,16 @@ publish_via_stream_protocol(Stream, Config) ->
|
|||
{ok, _, C3} = stream_test_utils:publish_entries(S, C2, PublisherId, length(Messages1), Messages1),
|
||||
|
||||
UncompressedSubbatch = stream_test_utils:sub_batch_entry_uncompressed(4, [<<"m4">>, <<"m5">>, <<"m6">>]),
|
||||
{ok, _, C4} = stream_test_utils:publish_entries(S, C3, PublisherId, 3, UncompressedSubbatch),
|
||||
{ok, _, C4} = stream_test_utils:publish_entries(S, C3, PublisherId, 1, UncompressedSubbatch),
|
||||
|
||||
CompressedSubbatch = stream_test_utils:sub_batch_entry_compressed(5, [<<"m7">>, <<"m8">>, <<"m9">>]),
|
||||
{ok, _, C5} = stream_test_utils:publish_entries(S, C4, PublisherId, 3, CompressedSubbatch),
|
||||
{ok, _, C5} = stream_test_utils:publish_entries(S, C4, PublisherId, 1, CompressedSubbatch),
|
||||
|
||||
M10 = stream_test_utils:simple_entry(6, <<"m10">>),
|
||||
M11 = stream_test_utils:simple_entry(7, <<"m11">>),
|
||||
Messages2 = [M10, M11],
|
||||
{ok, _, _C6} = stream_test_utils:publish_entries(S, C5, PublisherId, length(Messages2), Messages2).
|
||||
{ok, _, C6} = stream_test_utils:publish_entries(S, C5, PublisherId, length(Messages2), Messages2),
|
||||
{ok, _} = stream_test_utils:close(S, C6).
|
||||
|
||||
connection_config(Config) ->
|
||||
Host = ?config(rmq_hostname, Config),
|
||||
|
|
|
@ -67,7 +67,8 @@ groups() ->
|
|||
sasl_anonymous,
|
||||
test_publisher_with_too_long_reference_errors,
|
||||
test_consumer_with_too_long_reference_errors,
|
||||
subscribe_unsubscribe_should_create_events
|
||||
subscribe_unsubscribe_should_create_events,
|
||||
test_stream_test_utils
|
||||
]},
|
||||
%% Run `test_global_counters` on its own so the global metrics are
|
||||
%% initialised to 0 for each testcase
|
||||
|
@ -1053,6 +1054,21 @@ subscribe_unsubscribe_should_create_events(Config) ->
|
|||
closed = wait_for_socket_close(Transport, S, 10),
|
||||
ok.
|
||||
|
||||
test_stream_test_utils(Config) ->
|
||||
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
|
||||
{ok, S, C0} = stream_test_utils:connect(Config, 0),
|
||||
{ok, C1} = stream_test_utils:create_stream(S, C0, Stream),
|
||||
PublisherId = 42,
|
||||
{ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId),
|
||||
MsgPerBatch = 100,
|
||||
Payloads = lists:duplicate(MsgPerBatch, <<"m1">>),
|
||||
SequenceFrom1 = 1,
|
||||
{ok, C3} = stream_test_utils:publish(S, C2, PublisherId, SequenceFrom1, Payloads),
|
||||
{ok, C4} = stream_test_utils:delete_publisher(S, C3, PublisherId),
|
||||
{ok, C5} = stream_test_utils:delete_stream(S, C4, Stream),
|
||||
{ok, _} = stream_test_utils:close(S, C5),
|
||||
ok.
|
||||
|
||||
filtered_events(Config, EventType) ->
|
||||
Events = rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||
gen_event,
|
||||
|
|
Loading…
Reference in New Issue