Local shovel: more tests

This commit is contained in:
Diana Parra Corbacho 2025-07-29 10:47:02 +02:00
parent 414da5d146
commit 78167f0fb7
1 changed files with 64 additions and 6 deletions

View File

@ -68,8 +68,15 @@ groups() ->
local_to_local_delete_dest_queue,
local_to_local_vhost_access,
local_to_local_user_access,
local_to_local_credit_flow,
local_to_local_stream_credit_flow
local_to_local_credit_flow_on_confirm,
local_to_local_credit_flow_on_publish,
local_to_local_credit_flow_no_ack,
local_to_local_quorum_credit_flow_on_confirm,
local_to_local_quorum_credit_flow_on_publish,
local_to_local_quorum_credit_flow_no_ack,
local_to_local_stream_credit_flow_on_confirm,
local_to_local_stream_credit_flow_on_publish,
local_to_local_stream_credit_flow_no_ack
]}
].
@ -930,7 +937,16 @@ local_to_local_user_access(Config) ->
none]),
shovel_test_utils:await_no_shovel(Config, ?PARAM).
local_to_local_credit_flow(Config) ->
local_to_local_credit_flow_on_confirm(Config) ->
local_to_local_credit_flow(Config, <<"on-confirm">>).
local_to_local_credit_flow_on_publish(Config) ->
local_to_local_credit_flow(Config, <<"on-publish">>).
local_to_local_credit_flow_no_ack(Config) ->
local_to_local_credit_flow(Config, <<"no-ack">>).
local_to_local_credit_flow(Config, AckMode) ->
Src = ?config(srcq, Config),
Dest = ?config(destq, Config),
with_session(Config,
@ -939,13 +955,53 @@ local_to_local_credit_flow(Config) ->
[{<<"src-protocol">>, <<"local">>},
{<<"src-queue">>, Src},
{<<"dest-protocol">>, <<"local">>},
{<<"dest-queue">>, Dest}
{<<"dest-queue">>, Dest},
{<<"ack-mode">>, AckMode}
]),
publish_many(Sess, Src, Dest, <<"tag1">>, 500),
expect_many(Sess, Dest, 500)
end).
local_to_local_stream_credit_flow(Config) ->
local_to_local_quorum_credit_flow_on_confirm(Config) ->
local_to_local_quorum_credit_flow(Config, <<"on-confirm">>).
local_to_local_quorum_credit_flow_on_publish(Config) ->
local_to_local_quorum_credit_flow(Config, <<"on-publish">>).
local_to_local_quorum_credit_flow_no_ack(Config) ->
local_to_local_quorum_credit_flow(Config, <<"no-ack">>).
local_to_local_quorum_credit_flow(Config, AckMode) ->
Src = ?config(srcq, Config),
Dest = ?config(destq, Config),
VHost = <<"/">>,
declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
with_session(Config,
fun (Sess) ->
shovel_test_utils:set_param(Config, ?PARAM,
[{<<"src-protocol">>, <<"local">>},
{<<"src-queue">>, Src},
{<<"src-predeclared">>, true},
{<<"dest-protocol">>, <<"local">>},
{<<"dest-queue">>, Dest},
{<<"dest-predeclared">>, true},
{<<"ack-mode">>, AckMode}
]),
publish_many(Sess, Src, Dest, <<"tag1">>, 500),
expect_many(Sess, Dest, 500)
end).
local_to_local_stream_credit_flow_on_confirm(Config) ->
local_to_local_stream_credit_flow(Config, <<"on-confirm">>).
local_to_local_stream_credit_flow_on_publish(Config) ->
local_to_local_stream_credit_flow(Config, <<"on-publish">>).
local_to_local_stream_credit_flow_no_ack(Config) ->
local_to_local_stream_credit_flow(Config, <<"no-ack">>).
local_to_local_stream_credit_flow(Config, AckMode) ->
Src = ?config(srcq, Config),
Dest = ?config(destq, Config),
VHost = <<"/">>,
@ -959,7 +1015,8 @@ local_to_local_stream_credit_flow(Config) ->
{<<"src-predeclared">>, true},
{<<"dest-protocol">>, <<"local">>},
{<<"dest-queue">>, Dest},
{<<"dest-predeclared">>, true}
{<<"dest-predeclared">>, true},
{<<"ack-mode">>, AckMode}
]),
Receiver = subscribe(Sess, Dest),
@ -972,6 +1029,7 @@ local_to_local_stream_credit_flow(Config) ->
amqp10_client:detach_link(Receiver)
end).
%%----------------------------------------------------------------------------
with_session(Config, Fun) ->
with_session(Config, <<"/">>, Fun).