diff --git a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl index cdca22296d..1c8f90fffd 100644 --- a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl @@ -68,8 +68,15 @@ groups() -> local_to_local_delete_dest_queue, local_to_local_vhost_access, local_to_local_user_access, - local_to_local_credit_flow, - local_to_local_stream_credit_flow + local_to_local_credit_flow_on_confirm, + local_to_local_credit_flow_on_publish, + local_to_local_credit_flow_no_ack, + local_to_local_quorum_credit_flow_on_confirm, + local_to_local_quorum_credit_flow_on_publish, + local_to_local_quorum_credit_flow_no_ack, + local_to_local_stream_credit_flow_on_confirm, + local_to_local_stream_credit_flow_on_publish, + local_to_local_stream_credit_flow_no_ack ]} ]. @@ -930,7 +937,16 @@ local_to_local_user_access(Config) -> none]), shovel_test_utils:await_no_shovel(Config, ?PARAM). -local_to_local_credit_flow(Config) -> +local_to_local_credit_flow_on_confirm(Config) -> + local_to_local_credit_flow(Config, <<"on-confirm">>). + +local_to_local_credit_flow_on_publish(Config) -> + local_to_local_credit_flow(Config, <<"on-publish">>). + +local_to_local_credit_flow_no_ack(Config) -> + local_to_local_credit_flow(Config, <<"no-ack">>). + +local_to_local_credit_flow(Config, AckMode) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), with_session(Config, @@ -939,13 +955,53 @@ local_to_local_credit_flow(Config) -> [{<<"src-protocol">>, <<"local">>}, {<<"src-queue">>, Src}, {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} + {<<"dest-queue">>, Dest}, + {<<"ack-mode">>, AckMode} ]), publish_many(Sess, Src, Dest, <<"tag1">>, 500), expect_many(Sess, Dest, 500) end). -local_to_local_stream_credit_flow(Config) -> +local_to_local_quorum_credit_flow_on_confirm(Config) -> + local_to_local_quorum_credit_flow(Config, <<"on-confirm">>). + +local_to_local_quorum_credit_flow_on_publish(Config) -> + local_to_local_quorum_credit_flow(Config, <<"on-publish">>). + +local_to_local_quorum_credit_flow_no_ack(Config) -> + local_to_local_quorum_credit_flow(Config, <<"no-ack">>). + +local_to_local_quorum_credit_flow(Config, AckMode) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + VHost = <<"/">>, + declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), + declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), + with_session(Config, + fun (Sess) -> + shovel_test_utils:set_param(Config, ?PARAM, + [{<<"src-protocol">>, <<"local">>}, + {<<"src-queue">>, Src}, + {<<"src-predeclared">>, true}, + {<<"dest-protocol">>, <<"local">>}, + {<<"dest-queue">>, Dest}, + {<<"dest-predeclared">>, true}, + {<<"ack-mode">>, AckMode} + ]), + publish_many(Sess, Src, Dest, <<"tag1">>, 500), + expect_many(Sess, Dest, 500) + end). + +local_to_local_stream_credit_flow_on_confirm(Config) -> + local_to_local_stream_credit_flow(Config, <<"on-confirm">>). + +local_to_local_stream_credit_flow_on_publish(Config) -> + local_to_local_stream_credit_flow(Config, <<"on-publish">>). + +local_to_local_stream_credit_flow_no_ack(Config) -> + local_to_local_stream_credit_flow(Config, <<"no-ack">>). + +local_to_local_stream_credit_flow(Config, AckMode) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), VHost = <<"/">>, @@ -959,7 +1015,8 @@ local_to_local_stream_credit_flow(Config) -> {<<"src-predeclared">>, true}, {<<"dest-protocol">>, <<"local">>}, {<<"dest-queue">>, Dest}, - {<<"dest-predeclared">>, true} + {<<"dest-predeclared">>, true}, + {<<"ack-mode">>, AckMode} ]), Receiver = subscribe(Sess, Dest), @@ -972,6 +1029,7 @@ local_to_local_stream_credit_flow(Config) -> amqp10_client:detach_link(Receiver) end). + %%---------------------------------------------------------------------------- with_session(Config, Fun) -> with_session(Config, <<"/">>, Fun).