diff --git a/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl b/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl index 59cf8eb785..0c2f939ae1 100644 --- a/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl +++ b/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl @@ -77,6 +77,11 @@ subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) -> {{response, 1, {subscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0), {ok, C1}. +credit(Sock, Subscription, Credit) -> + CreditFrame = rabbit_stream_core:frame({credit, Subscription, Credit}), + ok = gen_tcp:send(Sock, CreditFrame), + ok. + unsubscribe(Sock, C0, SubscriptionId) -> UnsubscribeFrame = rabbit_stream_core:frame({request, 1, {unsubscribe, SubscriptionId}}), ok = gen_tcp:send(Sock, UnsubscribeFrame), @@ -149,20 +154,22 @@ sub_batch_entry_compressed(Sequence, Bodies) -> <>. + receive_stream_commands(Sock, C0) -> + receive_stream_commands(gen_tcp, Sock, C0). + +receive_stream_commands(Transport, Sock, C0) -> + receive_stream_commands(Transport, Sock, C0, 10). + +receive_stream_commands(_Transport, _Sock, C0, 0) -> + rabbit_stream_core:next_command(C0); +receive_stream_commands(Transport, Sock, C0, N) -> case rabbit_stream_core:next_command(C0) of empty -> - case gen_tcp:recv(Sock, 0, 5000) of + case Transport:recv(Sock, 0, 5000) of {ok, Data} -> C1 = rabbit_stream_core:incoming_data(Data, C0), - case rabbit_stream_core:next_command(C1) of - empty -> - {ok, Data2} = gen_tcp:recv(Sock, 0, 5000), - rabbit_stream_core:next_command( - rabbit_stream_core:incoming_data(Data2, C1)); - Res -> - Res - end; + receive_stream_commands(Transport, Sock, C1, N - 1); {error, Err} -> ct:fail("error receiving stream data ~w", [Err]) end; diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index 5b56eb1aba..2b431401bc 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -839,7 +839,8 @@ publish_via_stream_protocol(Stream, MsgPerBatch, Config) -> {ok, C5} = stream_test_utils:publish(S, C4, PublisherId2, SequenceFrom2, Payloads2), SubscriptionId = 97, - {ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 1), + {ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 0), + ok = stream_test_utils:credit(S, SubscriptionId, 1), %% delivery of first batch of messages {{deliver, SubscriptionId, _Bin1}, C7} = stream_test_utils:receive_stream_commands(S, C6), {ok, S, C7}. diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index c394f1bacb..deade27bca 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -1569,26 +1569,8 @@ wait_for_socket_close(Transport, S, Attempt) -> closed end. -receive_commands(Transport, S, C0) -> - case rabbit_stream_core:next_command(C0) of - empty -> - case Transport:recv(S, 0, 5000) of - {ok, Data} -> - C1 = rabbit_stream_core:incoming_data(Data, C0), - case rabbit_stream_core:next_command(C1) of - empty -> - {ok, Data2} = Transport:recv(S, 0, 5000), - rabbit_stream_core:next_command( - rabbit_stream_core:incoming_data(Data2, C1)); - Res -> - Res - end; - {error, Err} -> - ct:fail("error receiving data ~w", [Err]) - end; - Res -> - Res - end. +receive_commands(Transport, S, C) -> + stream_test_utils:receive_stream_commands(Transport, S, C). get_osiris_counters(Config) -> rabbit_ct_broker_helpers:rpc(Config,