diff --git a/deps/rabbitmq_federation/test/exchange_SUITE.erl b/deps/rabbitmq_federation/test/exchange_SUITE.erl index 879806659e..de74df27f2 100644 --- a/deps/rabbitmq_federation/test/exchange_SUITE.erl +++ b/deps/rabbitmq_federation/test/exchange_SUITE.erl @@ -447,7 +447,27 @@ binding_recovery(Config) -> rabbit_ct_broker_helpers:set_parameter(Config, Rabbit, <<"federation-upstream-set">>, <<"upstream">>, [[{<<"upstream">>, <<"rabbit">>}, {<<"exchange">>, <<"upstream">>}]]), - wait_for_federation(360, Config, Rabbit, <<"/">>), + wait_for_federation( + 30, + fun() -> + VHost = <<"/">>, + Xs = rabbit_ct_broker_helpers:rpc( + Config, Rabbit, rabbit_exchange, list, [VHost]), + L1 = + [X || X <- Xs, + X#exchange.type =:= 'x-federation-upstream'], + L2 = + [X || X <- Xs, + X#exchange.name =:= #resource{virtual_host = VHost, + kind = exchange, + name = <<"fed.downstream">>}, + X#exchange.scratches =:= [{federation, + [{{<<"rabbit">>, + <<"upstream">>}, + <<"A">>}]}]], + [] =/= L1 andalso [] =/= L2 andalso + has_internal_federated_queue(Config, Rabbit, VHost) + end), publish_expect(Ch3, <<"upstream">>, <<"key">>, Q, <<"HELLO">>), true = (none =/= suffix(Config, Rabbit, <<"rabbit">>, "upstream")), @@ -940,16 +960,23 @@ dynamic_policy_cleanup(Config) -> assert_connections(Config, 0, [X1], []) end, [x(X1)]). -wait_for_federation(0, _, Node, VHost) -> - throw({timeout_while_waiting_for_federation, Node, VHost}); -wait_for_federation(N, Config, Node, VHost) -> - case has_internal_federated_exchange(Config, Node, VHost) andalso - has_internal_federated_queue(Config, Node, VHost) of +wait_for_federation(Retries, Config, Node, VHost) -> + wait_for_federation( + Retries, + fun() -> + has_internal_federated_exchange(Config, Node, VHost) andalso + has_internal_federated_queue(Config, Node, VHost) + end). + +wait_for_federation(Retries, Fun) -> + case Fun() of true -> ok; + false when Retries > 0 -> + timer:sleep(1000), + wait_for_federation(Retries - 1, Fun); false -> - timer:sleep(500), - wait_for_federation(N - 1, Config, Node, VHost) + throw({timeout_while_waiting_for_federation, Fun}) end. has_internal_federated_exchange(Config, Node, VHost) ->