This commit is contained in:
Marcial Rosales 2024-07-24 09:26:34 +02:00 committed by Michael Klishin
parent e2e92d3214
commit 17e470e6eb
2 changed files with 13 additions and 10 deletions

View File

@ -94,11 +94,13 @@ init_source(Conf = #{ack_mode := AckMode,
NoAck = AckMode =:= no_ack, NoAck = AckMode =:= no_ack,
case NoAck of case NoAck of
false -> false ->
rabbit_log:debug("init_source. calling basic.qos ~p", [Prefetch]),
#'basic.qos_ok'{} = #'basic.qos_ok'{} =
amqp_channel:call(Chan, #'basic.qos'{prefetch_count = Prefetch}), amqp_channel:call(Chan, #'basic.qos'{prefetch_count = Prefetch}),
ok; ok;
true -> ok true -> ok
end, end,
rabbit_log:debug("init_source. calling remaining"),
Remaining = remaining(Chan, Conf), Remaining = remaining(Chan, Conf),
case Remaining of case Remaining of
0 -> 0 ->
@ -628,12 +630,10 @@ decl_fun(Decl, _Conn, Ch) ->
end || M <- lists:reverse(Decl)]. end || M <- lists:reverse(Decl)].
check_fun(Queue, _Conn, Ch) -> check_fun(Queue, _Conn, Ch) ->
try rabbit_log:debug("Checking if queue ~p exits", [Queue]),
amqp_channel:call(Ch, #'queue.declare'{queue = Queue, amqp_channel:call(Ch, #'queue.declare'{queue = Queue,
passive = true}) passive = true}),
after rabbit_log:debug("Queue ~p exits", [Queue]).
catch amqp_channel:close(Ch)
end.
parse_parameter(Param, Fun, Value) -> parse_parameter(Param, Fun, Value) ->
try try

View File

@ -22,7 +22,8 @@
all() -> all() ->
[ [
{group, non_parallel_tests} {group, non_parallel_tests},
{group, with_predefined_topology}
]. ].
groups() -> groups() ->
@ -45,7 +46,9 @@ groups() ->
init_per_suite(Config) -> init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(), rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config, [ Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, ?MODULE} {rmq_nodename_suffix, ?MODULE},
{ignored_crashes,
["server_initiated_close,404"]}
]), ]),
rabbit_ct_helpers:run_setup_steps(Config1, rabbit_ct_helpers:run_setup_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_broker_helpers:setup_steps() ++
@ -225,7 +228,7 @@ valid_configuration(Config) ->
valid_configuration_with_predefined_resources(Config) -> valid_configuration_with_predefined_resources(Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, setup_shovels2, [Config]), ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, setup_shovels2, [Config]),
run_valid_test2(Config), declare_queue(Config),
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, await_running_shovel, [test_shovel]). ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, await_running_shovel, [test_shovel]).
run_valid_test(Config) -> run_valid_test(Config) ->
@ -290,7 +293,7 @@ run_valid_test(Config) ->
rabbit_ct_client_helpers:close_channel(Chan). rabbit_ct_client_helpers:close_channel(Chan).
run_valid_test2(Config) -> declare_queue(Config) ->
Chan = rabbit_ct_client_helpers:open_channel(Config, 0), Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
amqp_channel:call(Chan, #'queue.declare'{queue = ?QUEUE, amqp_channel:call(Chan, #'queue.declare'{queue = ?QUEUE,
durable = true}), durable = true}),