rabbitmq-server/deps/rabbit/test/priority_queue_SUITE.erl

623 lines
24 KiB
Erlang

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(priority_queue_SUITE).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-compile(nowarn_export_all).
-compile(export_all).
all() ->
[
{group, single_node}
].
groups() ->
[
{single_node, [], [
ackfold,
drop,
{overflow_reject_publish, [], [reject]},
{overflow_reject_publish_dlx, [], [reject]},
dropwhile_fetchwhile,
info_head_message_timestamp,
info_backing_queue_version,
info_oldest_message_received_timestamp,
unknown_info_key,
matching,
purge,
requeue,
resume,
simple_order,
straight_through,
invoke,
gen_server2_stats,
negative_max_priorities,
max_priorities_above_hard_limit,
update_rates
]}
].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(single_node, Config) ->
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(
Config, [{rmq_nodes_count, 1},
{rmq_nodename_suffix, Suffix}]),
rabbit_ct_helpers:run_steps(
Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps());
init_per_group(overflow_reject_publish, Config) ->
rabbit_ct_helpers:set_config(Config, [
{overflow, <<"reject-publish">>}
]);
init_per_group(overflow_reject_publish_dlx, Config) ->
rabbit_ct_helpers:set_config(Config, [
{overflow, <<"reject-publish-dlx">>}
]).
end_per_group(overflow_reject_publish, _Config) ->
ok;
end_per_group(overflow_reject_publish_dlx, _Config) ->
ok;
end_per_group(_Group, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(Testcase, Config) ->
rabbit_ct_client_helpers:setup_steps(),
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
rabbit_ct_client_helpers:teardown_steps(),
rabbit_ct_helpers:testcase_finished(Config, Testcase).
%% -------------------------------------------------------------------
%% Testcases.
%% -------------------------------------------------------------------
%% The BQ API is used in all sorts of places in all sorts of
%% ways. Therefore we have to jump through a few different hoops
%% in order to integration-test it.
%%
%% * start/1, stop/0, init/3, terminate/2, delete_and_terminate/2
%% - starting and stopping rabbit. durable queues / persistent msgs needed
%% to test recovery
%%
%% * publish/5, drain_confirmed/1, fetch/2, ack/2, is_duplicate/2, msg_rates/1,
%% needs_timeout/1, timeout/1, invoke/3, resume/1 [0]
%% - regular publishing and consuming, with confirms and acks and durability
%%
%% * publish_delivered/4 - publish with acks straight through
%% * discard/3 - publish without acks straight through
%% * dropwhile/2 - expire messages without DLX
%% * fetchwhile/4 - expire messages with DLX
%% * ackfold/4 - reject messages with DLX
%% * requeue/2 - reject messages without DLX
%% * drop/2 - maxlen messages without DLX
%% * purge/1 - issue AMQP queue.purge
%% * purge_acks/1 - mirror queue explicit sync with unacked msgs
%% * fold/3 - mirror queue explicit sync
%% * depth/1 - mirror queue implicit sync detection
%% * len/1, is_empty/1 - info items
%% * handle_pre_hibernate/1 - hibernation
%%
%% * status/1
%% - maybe need unit testing?
%%
%% [0] publish enough to get credit flow from msg store
simple_order(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"simple_order-queue">>,
declare(Ch, Q, 3),
publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]),
get_all(Ch, Q, do_ack, [3, 3, 3, 2, 2, 2, 1, 1, 1]),
publish(Ch, Q, [2, 3, 1, 2, 3, 1, 2, 3, 1]),
get_all(Ch, Q, no_ack, [3, 3, 3, 2, 2, 2, 1, 1, 1]),
publish(Ch, Q, [3, 1, 2, 3, 1, 2, 3, 1, 2]),
get_all(Ch, Q, do_ack, [3, 3, 3, 2, 2, 2, 1, 1, 1]),
delete(Ch, Q),
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
matching(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"matching-queue">>,
declare(Ch, Q, 5),
%% We round priority down, and 0 is the default
publish(Ch, Q, [undefined, 0, 5, 10, undefined]),
get_all(Ch, Q, do_ack, [5, 10, undefined, 0, undefined]),
delete(Ch, Q),
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
resume(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"resume-queue">>,
declare(Ch, Q, 5),
amqp_channel:call(Ch, #'confirm.select'{}),
publish_many(Ch, Q, 10000),
amqp_channel:wait_for_confirms(Ch),
amqp_channel:call(Ch, #'queue.purge'{queue = Q}), %% Assert it exists
delete(Ch, Q),
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
straight_through(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"straight_through-queue">>,
declare(Ch, Q, 3),
[begin
consume(Ch, Q, Ack),
[begin
publish1(Ch, Q, P),
assert_delivered(Ch, Ack, P)
end || P <- [1, 2, 3]],
cancel(Ch)
end || Ack <- [do_ack, no_ack]],
get_empty(Ch, Q),
delete(Ch, Q),
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
max_priorities_above_hard_limit(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"max_priorities_above_hard_limit">>,
?assertExit(
{{shutdown, {server_initiated_close, 406, _}}, _},
%% Note that lower values (e.g. 300) will overflow the byte type here.
%% However, values >= 256 would still be rejected when used by
%% other clients
declare(Ch, Q, 3000)),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
negative_max_priorities(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"negative_max_priorities">>,
?assertExit(
{{shutdown, {server_initiated_close, 406, _}}, _},
declare(Ch, Q, -10)),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
invoke(Config) ->
%% Synthetic test to check the invoke callback, as the bug tested here
%% is only triggered with a race condition.
%% When mirroring is stopped, the backing queue of rabbit_amqqueue_process
%% changes from rabbit_mirror_queue_master to rabbit_priority_queue,
%% which shouldn't receive any invoke call. However, there might
%% be pending messages so the priority queue receives the
%% `run_backing_queue` cast message sent to the old master.
A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
Q = <<"invoke-queue">>,
declare(Ch, Q, 3),
Pid = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
rabbit_ct_broker_helpers:rpc(
Config, A, gen_server, cast,
[Pid,
{run_backing_queue, ?MODULE, fun(_, _) -> ok end}]),
Pid2 = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
Pid = Pid2,
delete(Ch, Q),
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
gen_server2_stats(Config) ->
%% Synthetic test to check the invoke callback, as the bug tested here
%% is only triggered with a race condition.
%% When mirroring is stopped, the backing queue of rabbit_amqqueue_process
%% changes from rabbit_mirror_queue_master to rabbit_priority_queue,
%% which shouldn't receive any invoke call. However, there might
%% be pending messages so the priority queue receives the
%% `run_backing_queue` cast message sent to the old master.
A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
Q = <<"gen_server2_stats_queue">>,
declare(Ch, Q, 3),
Pid = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
Metrics = rabbit_ct_broker_helpers:rpc(
Config, A, rabbit_core_metrics, get_gen_server2_stats,
[Pid]),
true = is_number(Metrics),
delete(Ch, Q),
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
dropwhile_fetchwhile(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"dropwhile_fetchwhile-queue">>,
[begin
declare(Ch, Q, Args ++ arguments(3)),
publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]),
timer:sleep(10),
get_empty(Ch, Q),
delete(Ch, Q)
end ||
Args <- [[{<<"x-message-ttl">>, long, 1}],
[{<<"x-message-ttl">>, long, 1},
{<<"x-dead-letter-exchange">>, longstr, <<"amq.fanout">>}]
]],
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
ackfold(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"ackfolq-queue1">>,
Q2 = <<"ackfold-queue2">>,
declare(Ch, Q,
[{<<"x-dead-letter-exchange">>, longstr, <<>>},
{<<"x-dead-letter-routing-key">>, longstr, Q2}
| arguments(3)]),
declare(Ch, Q2, none),
publish(Ch, Q, [1, 2, 3]),
[_, _, DTag] = get_all(Ch, Q, manual_ack, [3, 2, 1]),
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag,
multiple = true,
requeue = false}),
timer:sleep(100),
get_all(Ch, Q2, do_ack, [3, 2, 1]),
delete(Ch, Q),
delete(Ch, Q2),
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
requeue(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"requeue-queue">>,
declare(Ch, Q, 3),
publish(Ch, Q, [1, 2, 3]),
[_, _, DTag] = get_all(Ch, Q, manual_ack, [3, 2, 1]),
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag,
multiple = true,
requeue = true}),
get_all(Ch, Q, do_ack, [3, 2, 1]),
delete(Ch, Q),
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
drop(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"drop-queue">>,
declare(Ch, Q, [{<<"x-max-length">>, long, 4} | arguments(3)]),
publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]),
%% We drop from the head, so this is according to the "spec" even
%% if not likely to be what the user wants.
get_all(Ch, Q, do_ack, [2, 1, 1, 1]),
delete(Ch, Q),
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
reject(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
XOverflow = ?config(overflow, Config),
Q = <<"reject-queue-", XOverflow/binary>>,
declare(Ch, Q, [{<<"x-max-length">>, long, 4},
{<<"x-overflow">>, longstr, XOverflow}
| arguments(3)]),
publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]),
%% First 4 messages are published, all others are discarded.
get_all(Ch, Q, do_ack, [3, 2, 1, 1]),
delete(Ch, Q),
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
purge(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"purge-queue">>,
declare(Ch, Q, 3),
publish(Ch, Q, [1, 2, 3]),
amqp_channel:call(Ch, #'queue.purge'{queue = Q}),
get_empty(Ch, Q),
delete(Ch, Q),
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
info_head_message_timestamp(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, info_head_message_timestamp1, [Config]).
info_head_message_timestamp1(_Config) ->
QName = rabbit_misc:r(<<"/">>, queue,
<<"info_head_message_timestamp-queue">>),
ExName = rabbit_misc:r(<<"/">>, exchange, <<>>),
Q0 = rabbit_amqqueue:pseudo_queue(QName, self()),
Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 2}]),
PQ = rabbit_priority_queue,
BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end),
%% The queue is empty: no timestamp.
true = PQ:is_empty(BQS1),
'' = PQ:info(head_message_timestamp, BQS1),
%% Publish one message with timestamp 1000.
Content1 = #content{properties = #'P_basic'{priority = 1,
timestamp = 1000},
payload_fragments_rev = []},
{ok, Msg1} = mc_amqpl:message(ExName, <<>>, Content1, #{id => <<"msg1">>}),
BQS2 = PQ:publish(Msg1, #message_properties{size = 0}, false, self(), BQS1),
1000 = PQ:info(head_message_timestamp, BQS2),
%% Publish a higher priority message with no timestamp.
Content2 = #content{properties = #'P_basic'{priority = 2},
payload_fragments_rev = []},
{ok, Msg2} = mc_amqpl:message(ExName, <<>>, Content2, #{id => <<"msg2">>}),
BQS3 = PQ:publish(Msg2, #message_properties{size = 0}, false, self(), BQS2),
'' = PQ:info(head_message_timestamp, BQS3),
%% Consume message with no timestamp.
{{Msg2, _, _}, BQS4} = PQ:fetch(false, BQS3),
1000 = PQ:info(head_message_timestamp, BQS4),
%% Consume message with timestamp 1000, but do not acknowledge it
%% yet. The goal is to verify that the unacknowledged message's
%% timestamp is returned.
{{Msg1, _, AckTag}, BQS5} = PQ:fetch(true, BQS4),
1000 = PQ:info(head_message_timestamp, BQS5),
%% Ack message. The queue is empty now.
{[<<"msg1">>], BQS6} = PQ:ack([AckTag], BQS5),
true = PQ:is_empty(BQS6),
'' = PQ:info(head_message_timestamp, BQS6),
PQ:delete_and_terminate(a_whim, BQS6),
passed.
%% Because queue version is now ignored, this test is expected
%% to always get a queue version 2.
info_backing_queue_version(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q1 = <<"info-priority-queue-v1">>,
Q2 = <<"info-priority-queue-v2">>,
declare(Ch, Q1, [{<<"x-max-priority">>, byte, 3},
{<<"x-queue-version">>, byte, 1}]),
declare(Ch, Q2, [{<<"x-max-priority">>, byte, 3},
{<<"x-queue-version">>, byte, 2}]),
try
{ok, [{backing_queue_status, BQS1}]} = info(Config, Q1, [backing_queue_status]),
2 = proplists:get_value(version, BQS1),
{ok, [{backing_queue_status, BQS2}]} = info(Config, Q2, [backing_queue_status]),
2 = proplists:get_value(version, BQS2)
after
delete(Ch, Q1),
delete(Ch, Q2),
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
passed
end.
info_oldest_message_received_timestamp(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, info_oldest_message_received_timestamp1, [Config]).
info_oldest_message_received_timestamp1(_Config) ->
QName = rabbit_misc:r(<<"/">>, queue,
<<"info_oldest_message_received_timestamp-queue">>),
ExName = rabbit_misc:r(<<"/">>, exchange, <<>>),
Q0 = rabbit_amqqueue:pseudo_queue(QName, self()),
Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 2}]),
PQ = rabbit_priority_queue,
BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end),
%% The queue is empty: no timestamp.
true = PQ:is_empty(BQS1),
'' = PQ:info(oldest_message_received_timestamp, BQS1),
%% Publish one message.
Content1 = #content{properties = #'P_basic'{priority = 1},
payload_fragments_rev = []},
{ok, Msg1} = mc_amqpl:message(ExName, <<>>, Content1, #{id => <<"msg1">>}),
BQS2 = PQ:publish(Msg1, #message_properties{size = 0}, false, self(),
BQS1),
Ts1 = PQ:info(oldest_message_received_timestamp, BQS2),
?assert(is_integer(Ts1)),
%% Publish a higher priority message.
Content2 = #content{properties = #'P_basic'{priority = 2},
payload_fragments_rev = []},
{ok, Msg2} = mc_amqpl:message(ExName, <<>>, Content2, #{id => <<"msg2">>}),
BQS3 = PQ:publish(Msg2, #message_properties{size = 0}, false, self(),
BQS2),
%% Even though is highest priority, the lower priority message is older.
%% Timestamp hasn't changed.
?assertEqual(Ts1, PQ:info(oldest_message_received_timestamp, BQS3)),
%% Consume message.
{{Msg2, _, _}, BQS4} = PQ:fetch(false, BQS3),
?assertEqual(Ts1, PQ:info(oldest_message_received_timestamp, BQS4)),
%% Consume the first message, but do not acknowledge it
%% yet. The goal is to verify that the unacknowledged message's
%% timestamp is returned.
{{Msg1, _, AckTag}, BQS5} = PQ:fetch(true, BQS4),
?assertEqual(Ts1, PQ:info(oldest_message_received_timestamp, BQS5)),
%% Ack message. The queue is empty now.
{[<<"msg1">>], BQS6} = PQ:ack([AckTag], BQS5),
true = PQ:is_empty(BQS6),
?assertEqual('', PQ:info(oldest_message_received_timestamp, BQS6)),
PQ:delete_and_terminate(a_whim, BQS6),
passed.
unknown_info_key(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"info-priority-queue">>,
declare(Ch, Q, 3),
publish(Ch, Q, [1, 2, 3]),
{ok, [{pid, _Pid}, {unknown_key, ''}]} = info(Config, Q, [pid, unknown_key]),
delete(Ch, Q),
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
update_rates(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"update_rates-queue">>,
declare(Ch, Q, [{<<"x-max-priority">>, byte, 3}]),
QPid = queue_pid(Config, Node, rabbit_misc:r(<<"/">>, queue, Q)),
try
publish1(Ch, Q, 1),
QPid ! update_rates,
State = get_state(Config, Q),
?assertEqual(live, State),
delete(Ch, Q)
after
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
passed
end.
%%----------------------------------------------------------------------------
declare(Ch, Q, Args) when is_list(Args) ->
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
durable = true,
arguments = Args});
declare(Ch, Q, Max) ->
declare(Ch, Q, arguments(Max)).
delete(Ch, Q) ->
amqp_channel:call(Ch, #'queue.delete'{queue = Q}).
publish(Ch, Q, Ps) ->
amqp_channel:call(Ch, #'confirm.select'{}),
[publish1(Ch, Q, P) || P <- Ps],
amqp_channel:wait_for_confirms(Ch).
publish_payload(Ch, Q, PPds) ->
amqp_channel:call(Ch, #'confirm.select'{}),
[publish1(Ch, Q, P, Pd) || {P, Pd} <- PPds],
amqp_channel:wait_for_confirms(Ch).
publish_many(_Ch, _Q, 0) -> ok;
publish_many( Ch, Q, N) -> publish1(Ch, Q, rand:uniform(5)),
publish_many(Ch, Q, N - 1).
publish1(Ch, Q, P) ->
amqp_channel:cast(Ch, #'basic.publish'{routing_key = Q},
#amqp_msg{props = props(P),
payload = priority2bin(P)}).
publish1(Ch, Q, P, Pd) ->
amqp_channel:cast(Ch, #'basic.publish'{routing_key = Q},
#amqp_msg{props = props(P),
payload = Pd}).
props(undefined) -> #'P_basic'{delivery_mode = 2};
props(P) -> #'P_basic'{priority = P,
delivery_mode = 2}.
consume(Ch, Q, Ack) ->
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q,
no_ack = Ack =:= no_ack,
consumer_tag = <<"ctag">>},
self()),
receive
#'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
ok
end.
cancel(Ch) ->
amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}).
assert_delivered(Ch, Ack, P) ->
PBin = priority2bin(P),
receive
{#'basic.deliver'{delivery_tag = DTag}, #amqp_msg{payload = PBin2}} ->
PBin = PBin2,
maybe_ack(Ch, Ack, DTag)
end.
get_all(Ch, Q, Ack, Ps) ->
DTags = get_partial(Ch, Q, Ack, Ps),
get_empty(Ch, Q),
DTags.
get_partial(Ch, Q, Ack, Ps) ->
[get_ok(Ch, Q, Ack, priority2bin(P)) || P <- Ps].
get_empty(Ch, Q) ->
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = Q}).
get_ok(Ch, Q, Ack, PBin) ->
{#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = PBin2}} =
amqp_channel:call(Ch, #'basic.get'{queue = Q,
no_ack = Ack =:= no_ack}),
?assertEqual(PBin, PBin2),
maybe_ack(Ch, Ack, DTag).
get_payload(Ch, Q, Ack, Ps) ->
[get_ok(Ch, Q, Ack, P) || P <- Ps].
get_without_ack(Ch, Q) ->
{#'basic.get_ok'{}, _} =
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = false}).
maybe_ack(Ch, do_ack, DTag) ->
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag}),
DTag;
maybe_ack(_Ch, _, DTag) ->
DTag.
arguments(none) -> [];
arguments(Max) -> [{<<"x-max-priority">>, byte, Max}].
priority2bin(undefined) -> <<"undefined">>;
priority2bin(Int) -> list_to_binary(integer_to_list(Int)).
%%----------------------------------------------------------------------------
queue_pid(Config, Nodename, Q) ->
Info = rabbit_ct_broker_helpers:rpc(
Config, Nodename,
rabbit_amqqueue, info_all, [<<"/">>, [name, pid]]),
[Pid] = [P || [{name, Q1}, {pid, P}] <- Info, Q =:= Q1],
Pid.
info(Config, Q, InfoKeys) ->
Nodename = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
{ok, Amq} = rabbit_ct_broker_helpers:rpc(
Config, Nodename,
rabbit_amqqueue, lookup, [rabbit_misc:r(<<"/">>, queue, Q)]),
Info = rabbit_ct_broker_helpers:rpc(
Config, Nodename,
rabbit_classic_queue, info, [Amq, InfoKeys]),
{ok, Info}.
get_state(Config, Q) ->
Nodename = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
{ok, Amq} = rabbit_ct_broker_helpers:rpc(
Config, Nodename,
rabbit_amqqueue, lookup, [rabbit_misc:r(<<"/">>, queue, Q)]),
rabbit_ct_broker_helpers:rpc(
Config, Nodename,
amqqueue, get_state, [Amq]).
%%----------------------------------------------------------------------------