rabbitmq_federation: use dynamic waits where possible

to address spurious failures in CI

Building upon recent changes and discussion with @michaelklishin
This commit is contained in:
Philip Kuryloski 2020-10-13 10:02:41 +02:00
parent 1aa2e059bb
commit 4323c643e7
5 changed files with 21 additions and 29 deletions

View File

@ -93,6 +93,7 @@ run_federated(Config) ->
rabbit_federation_test_util:with_ch(
Config,
fun(_) ->
timer:sleep(3000),
{stream, [Props]} = ?CMD:run([], Opts#{only_down => false}),
<<"upstream">> = proplists:get_value(upstream_queue, Props),
<<"fed.downstream">> = proplists:get_value(queue, Props),
@ -117,6 +118,7 @@ run_down_federated(Config) ->
rabbit_federation_test_util:with_ch(
Config,
fun(_) ->
timer:sleep(3000),
{stream, ManyProps} = ?CMD:run([], Opts#{only_down => false}),
Links = [{proplists:get_value(upstream, Props),
proplists:get_value(status, Props)}
@ -130,6 +132,7 @@ run_down_federated(Config) ->
rabbit_federation_test_util:with_ch(
Config,
fun(_) ->
timer:sleep(3000),
{stream, [Props]} = ?CMD:run([], Opts#{only_down => true}),
<<"broken-bunny">> = proplists:get_value(upstream, Props),
error = proplists:get_value(status, Props)

View File

@ -158,15 +158,17 @@ multiple_upstreams_pattern(Config) ->
multiple_downstreams(Config) ->
with_ch(Config,
fun (Ch) ->
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>)
timer:sleep(2000),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, 3000),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>, 3000)
end, upstream_downstream() ++ [q(<<"fed.downstream2">>)]).
bidirectional(Config) ->
with_ch(Config,
fun (Ch) ->
publish_expect(Ch, <<>>, <<"one">>, <<"one">>, <<"first one">>),
publish_expect(Ch, <<>>, <<"two">>, <<"two">>, <<"first two">>),
timer:sleep(2000),
publish_expect(Ch, <<>>, <<"one">>, <<"one">>, <<"first one">>, 3000),
publish_expect(Ch, <<>>, <<"two">>, <<"two">>, <<"first two">>, 3000),
Seq = lists:seq(1, 100),
[publish(Ch, <<>>, <<"one">>, <<"bulk">>) || _ <- Seq],
[publish(Ch, <<>>, <<"two">>, <<"bulk">>) || _ <- Seq],
@ -180,7 +182,8 @@ bidirectional(Config) ->
dynamic_reconfiguration(Config) ->
with_ch(Config,
fun (Ch) ->
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>),
timer:sleep(2000),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, 3000),
%% Test that clearing connections works
clear_upstream(Config, 0, <<"localhost">>),
@ -199,8 +202,9 @@ dynamic_reconfiguration(Config) ->
federate_unfederate(Config) ->
with_ch(Config,
fun (Ch) ->
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>),
timer:sleep(2000),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, 3000),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>, 3000),
%% clear the policy
rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"fed">>),
@ -217,10 +221,11 @@ dynamic_plugin_stop_start(Config) ->
DownQ2 = <<"fed.downstream2">>,
with_ch(Config,
fun (Ch) ->
timer:sleep(2000),
UpQ = <<"upstream">>,
DownQ1 = <<"fed.downstream">>,
expect_federation(Ch, UpQ, DownQ1),
expect_federation(Ch, UpQ, DownQ2),
expect_federation(Ch, UpQ, DownQ1, 3000),
expect_federation(Ch, UpQ, DownQ2, 3000),
%% Disable the plugin, the link disappears
ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, "rabbitmq_federation"),

View File

@ -85,6 +85,7 @@ queue_status(Config) ->
with_ch(
Config,
fun (_Ch) ->
timer:sleep(3000),
[Link] = rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_federation_status, status, []),
true = is_binary(proplists:get_value(id, Link))
@ -107,6 +108,7 @@ lookup_queue_status(Config) ->
with_ch(
Config,
fun (_Ch) ->
timer:sleep(3000),
[Link] = rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_federation_status, status, []),
Id = proplists:get_value(id, Link),
@ -120,6 +122,7 @@ lookup_bad_status(Config) ->
with_ch(
Config,
fun (_Ch) ->
timer:sleep(3000),
not_found = rabbit_ct_broker_helpers:rpc(
Config, 0,
rabbit_federation_status, lookup, [<<"justmadeitup">>])

View File

@ -323,33 +323,13 @@ links(#'exchange.declare'{exchange = Name}) ->
end;
{error, not_found} ->
[]
end;
links(#'queue.declare'{queue = Name}) ->
case rabbit_amqqueue:lookup(qr(Name)) of
{ok, Q} ->
case rabbit_policy:get(<<"federation-upstream-set">>, Q) of
undefined ->
case rabbit_policy:get(<<"federation-upstream-pattern">>, Q) of
undefined -> [];
Regex ->
[{Name, U#upstream.name, U#upstream.queue_name} ||
U <- rabbit_federation_upstream:from_pattern(Regex, Q)]
end;
Set ->
[{Name, U#upstream.name, U#upstream.queue_name} ||
U <- rabbit_federation_upstream:from_set(Set, Q)]
end;
{error, not_found} ->
[]
end.
xr(Name) -> rabbit_misc:r(<<"/">>, exchange, Name).
qr(Name) -> rabbit_misc:r(<<"/">>, queue, Name).
with_ch(Config, Fun, Qs) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
declare_all(Ch, Qs),
timer:sleep(2000), %% Time for statuses to get updated
%% Clean up queues even after test failure.
try
Fun(Ch)

View File

@ -81,6 +81,7 @@ run(Config) ->
rabbit_federation_test_util:with_ch(
Config,
fun(_) ->
timer:sleep(3000),
[Link | _] = rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_federation_status, status, []),
Id = proplists:get_value(id, Link),