local_dynamic_SUITE: await credit for publishing links

This elimiantes a race condition between the destination
granting the sender link credit and the rest of what
the test does.

Note: the amqp_utils module in server core cannot be easily
moved to, say, rabbit_ct_helpers because it combines
two kinds of helpers that belong to two of our
CT helper subprojects.

So we've copied two small functions from it for
the needs of this suite.
This commit is contained in:
Michael Klishin 2025-07-23 15:18:04 -07:00 committed by Diana Parra Corbacho
parent b5a2dac758
commit 30f67e0f7a
2 changed files with 33 additions and 8 deletions

View File

@ -14,6 +14,8 @@
-compile(export_all). -compile(export_all).
-import(shovel_test_utils, [await_amqp10_event/3, await_credit/1]).
-define(PARAM, <<"test">>). -define(PARAM, <<"test">>).
all() -> all() ->
@ -106,6 +108,7 @@ init_per_testcase(Testcase, Config0) ->
VHost = list_to_binary(atom_to_list(Testcase) ++ "_vhost"), VHost = list_to_binary(atom_to_list(Testcase) ++ "_vhost"),
Config = [{srcq, SrcQ}, {destq, DestQ}, {destq2, DestQ2}, Config = [{srcq, SrcQ}, {destq, DestQ}, {destq2, DestQ2},
{alt_vhost, VHost} | Config0], {alt_vhost, VHost} | Config0],
rabbit_ct_helpers:testcase_started(Config, Testcase). rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) -> end_per_testcase(Testcase, Config) ->
@ -953,6 +956,8 @@ publish(Sender, Tag, Payload) when is_binary(Payload) ->
Headers = #{durable => true}, Headers = #{durable => true},
Msg = amqp10_msg:set_headers(Headers, Msg = amqp10_msg:set_headers(Headers,
amqp10_msg:new(Tag, Payload, false)), amqp10_msg:new(Tag, Payload, false)),
%% N.B.: this function does not attach a link and does not
%% need to use await_credit/1
ok = amqp10_client:send_msg(Sender, Msg), ok = amqp10_client:send_msg(Sender, Msg),
receive receive
{amqp10_disposition, {accepted, Tag}} -> ok {amqp10_disposition, {accepted, Tag}} -> ok
@ -965,6 +970,7 @@ publish(Session, Source, Dest, Tag, Payloads) ->
{ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName, Source, {ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName, Source,
unsettled, unsettled_state), unsettled, unsettled_state),
ok = await_amqp10_event(link, Sender, attached), ok = await_amqp10_event(link, Sender, attached),
ok = await_credit(Sender),
case is_list(Payloads) of case is_list(Payloads) of
true -> true ->
[publish(Sender, Tag, Payload) || Payload <- Payloads]; [publish(Sender, Tag, Payload) || Payload <- Payloads];
@ -981,13 +987,6 @@ publish_many(Session, Source, Dest, Tag, N) ->
Payloads = [integer_to_binary(Payload) || Payload <- lists:seq(1, N)], Payloads = [integer_to_binary(Payload) || Payload <- lists:seq(1, N)],
publish(Session, Source, Dest, Tag, Payloads). publish(Session, Source, Dest, Tag, Payloads).
await_amqp10_event(On, Ref, Evt) ->
receive
{amqp10_event, {On, Ref, Evt}} -> ok
after 5000 ->
exit({amqp10_event_timeout, On, Ref, Evt})
end.
expect_one(Session, Dest) -> expect_one(Session, Dest) ->
LinkName = <<"dynamic-receiver-", Dest/binary>>, LinkName = <<"dynamic-receiver-", Dest/binary>>,
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName, {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName,

View File

@ -13,7 +13,8 @@
shovels_from_status/0, shovels_from_status/1, shovels_from_status/0, shovels_from_status/1,
get_shovel_status/2, get_shovel_status/3, get_shovel_status/2, get_shovel_status/3,
restart_shovel/2, restart_shovel/2,
await/1, await/2, clear_param/2, clear_param/3, make_uri/2, await/1, await/2, await_amqp10_event/3, await_credit/1,
clear_param/2, clear_param/3, make_uri/2,
make_uri/3, make_uri/5, make_uri/3, make_uri/5,
await_shovel1/4, await_no_shovel/2]). await_shovel1/4, await_no_shovel/2]).
@ -87,6 +88,31 @@ await_no_shovel(Config, Name) ->
ok ok
end. end.
flush(Prefix) ->
receive
Msg ->
ct:log("~p flushed: ~p~n", [Prefix, Msg]),
flush(Prefix)
after 1 ->
ok
end.
await_credit(Sender) ->
receive
{amqp10_event, {link, Sender, credited}} ->
ok
after 5_000 ->
flush("await_credit timed out"),
ct:fail(credited_timeout)
end.
await_amqp10_event(On, Ref, Evt) ->
receive
{amqp10_event, {On, Ref, Evt}} -> ok
after 5_000 ->
exit({amqp10_event_timeout, On, Ref, Evt})
end.
shovels_from_status() -> shovels_from_status() ->
shovels_from_status(running). shovels_from_status(running).