From b3b09400249bb5fa042733f0465fb433bb22ebeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 21 Jan 2025 17:38:58 +0100 Subject: [PATCH] Fix wait-for-confirms sequence in stream test utils And refine the implementation and its usage. --- .../src/stream_test_utils.erl | 38 ++++++++++++++++++- .../test/rabbit_prometheus_http_SUITE.erl | 15 ++++++-- .../test/protocol_interop_SUITE.erl | 7 ++-- .../test/rabbit_stream_SUITE.erl | 18 ++++++++- 4 files changed, 68 insertions(+), 10 deletions(-) diff --git a/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl b/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl index 902edfab84..59cf8eb785 100644 --- a/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl +++ b/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl @@ -40,24 +40,49 @@ connect(Config, Node) -> {{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4), {ok, Sock, C5}. +close(Sock, C0) -> + CloseReason = <<"OK">>, + CloseFrame = rabbit_stream_core:frame({request, 1, {close, ?RESPONSE_CODE_OK, CloseReason}}), + ok = gen_tcp:send(Sock, CloseFrame), + {{response, 1, {close, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0), + {ok, C1}. + create_stream(Sock, C0, Stream) -> CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}), ok = gen_tcp:send(Sock, CreateStreamFrame), {{response, 1, {create_stream, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0), {ok, C1}. +delete_stream(Sock, C0, Stream) -> + DeleteStreamFrame = rabbit_stream_core:frame({request, 1, {delete_stream, Stream}}), + ok = gen_tcp:send(Sock, DeleteStreamFrame), + {{response, 1, {delete_stream, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0), + {ok, C1}. + declare_publisher(Sock, C0, Stream, PublisherId) -> DeclarePublisherFrame = rabbit_stream_core:frame({request, 1, {declare_publisher, PublisherId, <<>>, Stream}}), ok = gen_tcp:send(Sock, DeclarePublisherFrame), {{response, 1, {declare_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0), {ok, C1}. +delete_publisher(Sock, C0, PublisherId) -> + DeletePublisherFrame = rabbit_stream_core:frame({request, 1, {delete_publisher, PublisherId}}), + ok = gen_tcp:send(Sock, DeletePublisherFrame), + {{response, 1, {delete_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0), + {ok, C1}. + subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) -> SubscribeFrame = rabbit_stream_core:frame({request, 1, {subscribe, SubscriptionId, Stream, _OffsetSpec = first, InitialCredit, _Props = #{}}}), ok = gen_tcp:send(Sock, SubscribeFrame), {{response, 1, {subscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0), {ok, C1}. +unsubscribe(Sock, C0, SubscriptionId) -> + UnsubscribeFrame = rabbit_stream_core:frame({request, 1, {unsubscribe, SubscriptionId}}), + ok = gen_tcp:send(Sock, UnsubscribeFrame), + {{response, 1, {unsubscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0), + {ok, C1}. + publish(Sock, C0, PublisherId, Sequence0, Payloads) -> SeqIds = lists:seq(Sequence0, Sequence0 + length(Payloads) - 1), Messages = [simple_entry(Seq, P) @@ -68,8 +93,17 @@ publish(Sock, C0, PublisherId, Sequence0, Payloads) -> publish_entries(Sock, C0, PublisherId, MsgCount, Messages) -> PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, MsgCount, Messages}), ok = gen_tcp:send(Sock, PublishFrame1), - {{publish_confirm, PublisherId, SeqIds}, C1} = receive_stream_commands(Sock, C0), - {ok, SeqIds, C1}. + wait_for_confirms(Sock, C0, PublisherId, [], MsgCount). + +wait_for_confirms(_, C, _, Acc, 0) -> + {ok, Acc, C}; +wait_for_confirms(S, C0, PublisherId, Acc, Remaining) -> + case receive_stream_commands(S, C0) of + {{publish_confirm, PublisherId, SeqIds}, C1} -> + wait_for_confirms(S, C1, PublisherId, Acc ++ SeqIds, Remaining - length(SeqIds)); + {Frame, C1} -> + {unexpected_frame, Frame, C1} + end. %% Streams contain AMQP 1.0 encoded messages. %% In this case, the AMQP 1.0 encoded message contains a single data section. diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index 41ddc664fa..5b56eb1aba 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -744,10 +744,10 @@ exchange_names_metric(Config) -> stream_pub_sub_metrics(Config) -> Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1", MsgPerBatch1 = 2, - publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config), + {ok, S1, C1} = publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config), Stream2 = atom_to_list(?FUNCTION_NAME) ++ "2", MsgPerBatch2 = 3, - publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config), + {ok, S2, C2} = publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config), %% aggregated metrics @@ -770,6 +770,8 @@ stream_pub_sub_metrics(Config) -> ?assertEqual([{#{vhost => "/", queue => Stream1}, [2]}, {#{vhost => "/", queue => Stream2}, [3]}], lists:sort(maps:to_list(MaxOffsetLag))), + dispose_stream_connection(S1, C1, list_to_binary(Stream1)), + dispose_stream_connection(S2, C2, list_to_binary(Stream2)), ok. core_metrics_special_chars(Config) -> @@ -839,8 +841,13 @@ publish_via_stream_protocol(Stream, MsgPerBatch, Config) -> SubscriptionId = 97, {ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 1), %% delivery of first batch of messages - {{deliver, SubscriptionId, _Bin1}, _C7} = stream_test_utils:receive_stream_commands(S, C6), - ok. + {{deliver, SubscriptionId, _Bin1}, C7} = stream_test_utils:receive_stream_commands(S, C6), + {ok, S, C7}. + +dispose_stream_connection(Sock, C0, Stream) -> + {ok, C1} = stream_test_utils:delete_stream(Sock, C0, Stream), + {_MetadataUpdateFrame, C2} = stream_test_utils:receive_stream_commands(Sock, C1), + {ok, _} = stream_test_utils:close(Sock, C2). http_get(Config, ReqHeaders, CodeExp) -> Path = proplists:get_value(prometheus_path, Config, "/metrics"), diff --git a/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl b/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl index 12f7d0e6e4..3db855f55e 100644 --- a/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl +++ b/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl @@ -339,15 +339,16 @@ publish_via_stream_protocol(Stream, Config) -> {ok, _, C3} = stream_test_utils:publish_entries(S, C2, PublisherId, length(Messages1), Messages1), UncompressedSubbatch = stream_test_utils:sub_batch_entry_uncompressed(4, [<<"m4">>, <<"m5">>, <<"m6">>]), - {ok, _, C4} = stream_test_utils:publish_entries(S, C3, PublisherId, 3, UncompressedSubbatch), + {ok, _, C4} = stream_test_utils:publish_entries(S, C3, PublisherId, 1, UncompressedSubbatch), CompressedSubbatch = stream_test_utils:sub_batch_entry_compressed(5, [<<"m7">>, <<"m8">>, <<"m9">>]), - {ok, _, C5} = stream_test_utils:publish_entries(S, C4, PublisherId, 3, CompressedSubbatch), + {ok, _, C5} = stream_test_utils:publish_entries(S, C4, PublisherId, 1, CompressedSubbatch), M10 = stream_test_utils:simple_entry(6, <<"m10">>), M11 = stream_test_utils:simple_entry(7, <<"m11">>), Messages2 = [M10, M11], - {ok, _, _C6} = stream_test_utils:publish_entries(S, C5, PublisherId, length(Messages2), Messages2). + {ok, _, C6} = stream_test_utils:publish_entries(S, C5, PublisherId, length(Messages2), Messages2), + {ok, _} = stream_test_utils:close(S, C6). connection_config(Config) -> Host = ?config(rmq_hostname, Config), diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 9c2a0c1df1..c394f1bacb 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -67,7 +67,8 @@ groups() -> sasl_anonymous, test_publisher_with_too_long_reference_errors, test_consumer_with_too_long_reference_errors, - subscribe_unsubscribe_should_create_events + subscribe_unsubscribe_should_create_events, + test_stream_test_utils ]}, %% Run `test_global_counters` on its own so the global metrics are %% initialised to 0 for each testcase @@ -1053,6 +1054,21 @@ subscribe_unsubscribe_should_create_events(Config) -> closed = wait_for_socket_close(Transport, S, 10), ok. +test_stream_test_utils(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME, utf8), + {ok, S, C0} = stream_test_utils:connect(Config, 0), + {ok, C1} = stream_test_utils:create_stream(S, C0, Stream), + PublisherId = 42, + {ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId), + MsgPerBatch = 100, + Payloads = lists:duplicate(MsgPerBatch, <<"m1">>), + SequenceFrom1 = 1, + {ok, C3} = stream_test_utils:publish(S, C2, PublisherId, SequenceFrom1, Payloads), + {ok, C4} = stream_test_utils:delete_publisher(S, C3, PublisherId), + {ok, C5} = stream_test_utils:delete_stream(S, C4, Stream), + {ok, _} = stream_test_utils:close(S, C5), + ok. + filtered_events(Config, EventType) -> Events = rabbit_ct_broker_helpers:rpc(Config, 0, gen_event,