queue_SUITE: use a different upstream for each queue on multi-federation tests

This commit is contained in:
Diana Parra Corbacho 2024-10-29 08:52:24 +01:00
parent 444df0029b
commit 02bca637e1
4 changed files with 42 additions and 35 deletions

View File

@ -95,12 +95,12 @@ run_federated(Config) ->
timer:sleep(3000),
{stream, [Props]} = ?CMD:run([], Opts#{only_down => false}),
<<"upstream">> = proplists:get_value(upstream_queue, Props),
<<"fed.downstream">> = proplists:get_value(queue, Props),
<<"fed1.downstream">> = proplists:get_value(queue, Props),
<<"fed.tag">> = proplists:get_value(consumer_tag, Props),
running = proplists:get_value(status, Props)
end,
[rabbit_federation_test_util:q(<<"upstream">>),
rabbit_federation_test_util:q(<<"fed.downstream">>)]),
rabbit_federation_test_util:q(<<"fed1.downstream">>)]),
%% Down
rabbit_federation_test_util:with_ch(
Config,
@ -108,7 +108,7 @@ run_federated(Config) ->
{stream, []} = ?CMD:run([], Opts#{only_down => true})
end,
[rabbit_federation_test_util:q(<<"upstream">>),
rabbit_federation_test_util:q(<<"fed.downstream">>)]).
rabbit_federation_test_util:q(<<"fed1.downstream">>)]).
run_down_federated(Config) ->
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@ -128,7 +128,7 @@ run_down_federated(Config) ->
end, 15000)
end,
[rabbit_federation_test_util:q(<<"upstream">>),
rabbit_federation_test_util:q(<<"fed.downstream">>)]),
rabbit_federation_test_util:q(<<"fed1.downstream">>)]),
%% Down
rabbit_federation_test_util:with_ch(
Config,
@ -142,12 +142,12 @@ run_down_federated(Config) ->
end, 15000)
end,
[rabbit_federation_test_util:q(<<"upstream">>),
rabbit_federation_test_util:q(<<"fed.downstream">>)]).
rabbit_federation_test_util:q(<<"fed1.downstream">>)]).
output_federated(Config) ->
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Opts = #{node => A},
Input = {stream,[[{queue, <<"fed.downstream">>},
Input = {stream,[[{queue, <<"fed1.downstream">>},
{consumer_tag, <<"fed.tag">>},
{upstream_queue, <<"upstream">>},
{type, queue},
@ -157,7 +157,7 @@ output_federated(Config) ->
{local_connection, <<"<rmq-ct-federation_status_command_SUITE-1-21000@localhost.1.563.0>">>},
{uri, <<"amqp://localhost:21000">>},
{timestamp, {{2016,11,21},{8,51,19}}}]]},
{stream, [#{queue := <<"fed.downstream">>,
{stream, [#{queue := <<"fed1.downstream">>,
upstream_queue := <<"upstream">>,
type := queue,
vhost := <<"/">>,

View File

@ -160,7 +160,7 @@ end_per_testcase(Testcase, Config) ->
simple(Config) ->
with_ch(Config,
fun (Ch) ->
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>)
expect_federation(Ch, <<"upstream">>, <<"fed1.downstream">>)
end, upstream_downstream(Config)).
multiple_upstreams_pattern(Config) ->
@ -200,9 +200,9 @@ multiple_downstreams(Config) ->
with_ch(Config,
fun (Ch) ->
timer:sleep(?INITIAL_WAIT),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT)
end, upstream_downstream(Config) ++ [q(<<"fed.downstream2">>, Args)]).
expect_federation(Ch, <<"upstream">>, <<"fed1.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, <<"upstream2">>, <<"fed2.downstream">>, ?EXPECT_FEDERATION_TIMEOUT)
end, upstream_downstream(Config) ++ [q(<<"fed2.downstream">>, Args)]).
message_flow(Config) ->
%% TODO: specifc source / target here
@ -236,11 +236,11 @@ dynamic_reconfiguration(Config) ->
with_ch(Config,
fun (Ch) ->
timer:sleep(?INITIAL_WAIT),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, <<"upstream">>, <<"fed1.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
%% Test that clearing connections works
clear_upstream(Config, 0, <<"localhost">>),
expect_no_federation(Ch, <<"upstream">>, <<"fed.downstream">>),
expect_no_federation(Ch, <<"upstream">>, <<"fed1.downstream">>),
%% Test that reading them and changing them works
set_upstream(Config, 0,
@ -249,7 +249,7 @@ dynamic_reconfiguration(Config) ->
URI = rabbit_ct_broker_helpers:node_uri(Config, 0, [use_ipaddr]),
set_upstream(Config, 0, <<"localhost">>, URI),
set_upstream(Config, 0, <<"localhost">>, URI),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>)
expect_federation(Ch, <<"upstream">>, <<"fed1.downstream">>)
end, upstream_downstream(Config)).
federate_unfederate(Config) ->
@ -257,37 +257,38 @@ federate_unfederate(Config) ->
with_ch(Config,
fun (Ch) ->
timer:sleep(?INITIAL_WAIT),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, <<"upstream">>, <<"fed1.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, <<"upstream2">>, <<"fed2.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
%% clear the policy
rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"fed">>),
expect_no_federation(Ch, <<"upstream">>, <<"fed.downstream">>),
expect_no_federation(Ch, <<"upstream">>, <<"fed.downstream2">>),
expect_no_federation(Ch, <<"upstream">>, <<"fed1.downstream">>),
expect_no_federation(Ch, <<"upstream2">>, <<"fed2.downstream">>),
rabbit_ct_broker_helpers:set_policy(Config, 0,
<<"fed">>, <<"^fed\.">>, <<"all">>, [
<<"fed">>, <<"^fed1\.">>, <<"all">>, [
{<<"federation-upstream-set">>, <<"upstream">>}])
end, upstream_downstream(Config) ++ [q(<<"fed.downstream2">>, Args)]).
end, upstream_downstream(Config) ++ [q(<<"fed2.downstream">>, Args)]).
dynamic_plugin_stop_start(Config) ->
DownQ2 = <<"fed.downstream2">>,
DownQ2 = <<"fed2.downstream">>,
Args = ?config(target_queue_args, Config),
with_ch(Config,
fun (Ch) ->
timer:sleep(?INITIAL_WAIT),
UpQ = <<"upstream">>,
DownQ1 = <<"fed.downstream">>,
expect_federation(Ch, UpQ, DownQ1, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, UpQ, DownQ2, ?EXPECT_FEDERATION_TIMEOUT),
UpQ1 = <<"upstream">>,
UpQ2 = <<"upstream2">>,
DownQ1 = <<"fed1.downstream">>,
expect_federation(Ch, UpQ1, DownQ1, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, UpQ2, DownQ2, ?EXPECT_FEDERATION_TIMEOUT),
%% Disable the plugin, the link disappears
ct:pal("Stopping rabbitmq_federation"),
ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, "rabbitmq_federation"),
expect_no_federation(Ch, UpQ, DownQ1),
expect_no_federation(Ch, UpQ, DownQ2),
expect_no_federation(Ch, UpQ1, DownQ1),
expect_no_federation(Ch, UpQ2, DownQ2),
maybe_declare_queue(Config, Ch, q(DownQ1, Args)),
maybe_declare_queue(Config, Ch, q(DownQ2, Args)),
@ -305,12 +306,13 @@ dynamic_plugin_stop_start(Config) ->
Entry || Entry <- Status,
proplists:get_value(queue, Entry) =:= DownQ1 orelse
proplists:get_value(queue, Entry) =:= DownQ2,
proplists:get_value(upstream_queue, Entry) =:= UpQ,
proplists:get_value(upstream_queue, Entry) =:= UpQ1 orelse
proplists:get_value(upstream_queue, Entry) =:= UpQ2,
proplists:get_value(status, Entry) =:= running
],
length(L) =:= 2
end),
expect_federation(Ch, UpQ, DownQ1, 120000)
expect_federation(Ch, UpQ1, DownQ1, 120000)
end, upstream_downstream(Config) ++ [q(DownQ2, Args)]).
restart_upstream(Config) ->
@ -392,4 +394,4 @@ upstream_downstream() ->
upstream_downstream(Config) ->
SourceArgs = ?config(source_queue_args, Config),
TargetArgs = ?config(target_queue_args, Config),
[q(<<"upstream">>, SourceArgs), q(<<"fed.downstream">>, TargetArgs)].
[q(<<"upstream">>, SourceArgs), q(<<"fed1.downstream">>, TargetArgs)].

View File

@ -96,12 +96,17 @@ setup_federation_with_upstream_params(Config, ExtraParams) ->
rabbit_ct_broker_helpers:rpc(
Config, 0, rabbit_policy, set,
[<<"/">>, <<"fed">>, <<"^fed\.">>, [{<<"federation-upstream-set">>, <<"upstream">>}],
[<<"/">>, <<"fed">>, <<"^fed1\.">>, [{<<"federation-upstream-set">>, <<"upstream">>}],
0, <<"all">>, <<"acting-user">>]),
rabbit_ct_broker_helpers:rpc(
Config, 0, rabbit_policy, set,
[<<"/">>, <<"fed12">>, <<"^fed12\.">>, [{<<"federation-upstream-set">>, <<"upstream12">>}],
[<<"/">>, <<"fed2">>, <<"^fed2\.">>, [{<<"federation-upstream-set">>, <<"upstream2">>}],
0, <<"all">>, <<"acting-user">>]),
rabbit_ct_broker_helpers:rpc(
Config, 0, rabbit_policy, set,
[<<"/">>, <<"fed12">>, <<"^fed3\.">>, [{<<"federation-upstream-set">>, <<"upstream12">>}],
2, <<"all">>, <<"acting-user">>]),
rabbit_ct_broker_helpers:set_policy(Config, 0,
@ -144,10 +149,10 @@ setup_down_federation(Config) ->
{<<"queue">>, <<"upstream">>}]]),
rabbit_ct_broker_helpers:set_policy(
Config, 0,
<<"fed">>, <<"^fed\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]),
<<"fed">>, <<"^fed1\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]),
rabbit_ct_broker_helpers:set_policy(
Config, 0,
<<"fed">>, <<"^fed\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]),
<<"fed">>, <<"^fed1\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]),
Config.
wait_for_federation(Retries, Fun) ->

View File

@ -87,7 +87,7 @@ run(Config) ->
ok = ?CMD:run([Id], Opts)
end,
[rabbit_federation_test_util:q(<<"upstream">>),
rabbit_federation_test_util:q(<<"fed.downstream">>)]).
rabbit_federation_test_util:q(<<"fed1.downstream">>)]).
run_not_found(Config) ->
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),