Move consumer timeout tests to own SUITE
Also handle case where client does not support consumer cancellation and rename the queue_cleanup timer to a generic "tick" timer for channels to perform periodic activities. [#164212469]
This commit is contained in:
parent
d389d045df
commit
ec76fef8a8
3
Makefile
3
Makefile
|
@ -126,7 +126,8 @@ define PROJECT_ENV
|
||||||
{vhost_restart_strategy, continue},
|
{vhost_restart_strategy, continue},
|
||||||
%% {global, prefetch count}
|
%% {global, prefetch count}
|
||||||
{default_consumer_prefetch, {false, 0}},
|
{default_consumer_prefetch, {false, 0}},
|
||||||
{channel_queue_cleanup_interval, 60000},
|
%% interval at which the channel can perform periodic actions
|
||||||
|
{channel_tick_interval, 60000},
|
||||||
%% Default max message size is 128 MB
|
%% Default max message size is 128 MB
|
||||||
{max_message_size, 134217728}
|
{max_message_size, 134217728}
|
||||||
]
|
]
|
||||||
|
|
|
@ -169,7 +169,7 @@
|
||||||
delivery_flow,
|
delivery_flow,
|
||||||
interceptor_state,
|
interceptor_state,
|
||||||
queue_states,
|
queue_states,
|
||||||
queue_cleanup_timer
|
tick_timer
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-define(QUEUE, lqueue).
|
-define(QUEUE, lqueue).
|
||||||
|
@ -489,7 +489,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
|
||||||
end,
|
end,
|
||||||
MaxMessageSize = get_max_message_size(),
|
MaxMessageSize = get_max_message_size(),
|
||||||
ConsumerTimeout = get_consumer_timeout(),
|
ConsumerTimeout = get_consumer_timeout(),
|
||||||
rabbit_log:info("consumer timeout ~w", [ConsumerTimeout]),
|
|
||||||
State = #ch{cfg = #conf{state = starting,
|
State = #ch{cfg = #conf{state = starting,
|
||||||
protocol = Protocol,
|
protocol = Protocol,
|
||||||
channel = Channel,
|
channel = Channel,
|
||||||
|
@ -535,7 +534,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
|
||||||
rabbit_event:if_enabled(State2, #ch.stats_timer,
|
rabbit_event:if_enabled(State2, #ch.stats_timer,
|
||||||
fun() -> emit_stats(State2) end),
|
fun() -> emit_stats(State2) end),
|
||||||
put_operation_timeout(),
|
put_operation_timeout(),
|
||||||
State3 = init_queue_cleanup_timer(State2),
|
State3 = init_tick_timer(State2),
|
||||||
{ok, State3, hibernate,
|
{ok, State3, hibernate,
|
||||||
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
|
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
|
||||||
|
|
||||||
|
@ -828,12 +827,13 @@ handle_info({{Ref, Node}, LateAnswer},
|
||||||
[Channel, LateAnswer, Node]),
|
[Channel, LateAnswer, Node]),
|
||||||
noreply(State);
|
noreply(State);
|
||||||
|
|
||||||
handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel,
|
handle_info(tick, State0 = #ch{cfg = #conf{channel = Channel,
|
||||||
consumer_timeout = Timeout},
|
capabilities = Capabilities,
|
||||||
queue_states = QueueStates0,
|
consumer_timeout = Timeout},
|
||||||
queue_names = QNames,
|
queue_states = QueueStates0,
|
||||||
queue_consumers = QCons,
|
queue_names = QNames,
|
||||||
unacked_message_q = UAMQ}) ->
|
queue_consumers = QCons,
|
||||||
|
unacked_message_q = UAMQ}) ->
|
||||||
QueueStates1 =
|
QueueStates1 =
|
||||||
maps:filter(fun(_, QS) ->
|
maps:filter(fun(_, QS) ->
|
||||||
QName = rabbit_quorum_queue:queue_name(QS),
|
QName = rabbit_quorum_queue:queue_name(QS),
|
||||||
|
@ -845,22 +845,24 @@ handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel,
|
||||||
{value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}}
|
{value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}}
|
||||||
when is_integer(Timeout)
|
when is_integer(Timeout)
|
||||||
andalso Time < Now - Timeout ->
|
andalso Time < Now - Timeout ->
|
||||||
case ConsumerTag of
|
rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out "
|
||||||
_ when is_integer(ConsumerTag) ->
|
"waiting on ack",
|
||||||
%% basic.get - there is no mechanims so we just crash the
|
[rabbit_data_coercion:to_binary(ConsumerTag),
|
||||||
%% channel
|
Channel]),
|
||||||
|
SupportsCancel = case rabbit_misc:table_lookup(
|
||||||
|
Capabilities,
|
||||||
|
<<"consumer_cancel_notify">>) of
|
||||||
|
{bool, true} when is_binary(ConsumerTag) ->
|
||||||
|
true;
|
||||||
|
_ -> false
|
||||||
|
end,
|
||||||
|
case SupportsCancel of
|
||||||
|
false ->
|
||||||
Ex = rabbit_misc:amqp_error(precondition_failed,
|
Ex = rabbit_misc:amqp_error(precondition_failed,
|
||||||
"basic.get ack timed out on channel ~w",
|
"consumer ack timed out on channel ~w",
|
||||||
[Channel], none),
|
[Channel], none),
|
||||||
handle_exception(Ex, State0);
|
handle_exception(Ex, State0);
|
||||||
% rabbit_misc:protocol_error(precondition_failed,
|
true ->
|
||||||
% "basic.get ack timed out on channel ~w ",
|
|
||||||
% [Channel]);
|
|
||||||
_ ->
|
|
||||||
rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out "
|
|
||||||
"waiting on ack",
|
|
||||||
[rabbit_data_coercion:to_binary(ConsumerTag),
|
|
||||||
Channel]),
|
|
||||||
QRef = qpid_to_ref(QPid),
|
QRef = qpid_to_ref(QPid),
|
||||||
QName = maps:get(QRef, QNames),
|
QName = maps:get(QRef, QNames),
|
||||||
%% cancel the consumer with the client
|
%% cancel the consumer with the client
|
||||||
|
@ -881,15 +883,14 @@ handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel,
|
||||||
?QUEUE:to_list(UAMQ)),
|
?QUEUE:to_list(UAMQ)),
|
||||||
QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds},
|
QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds},
|
||||||
self(), QueueStates2),
|
self(), QueueStates2),
|
||||||
|
|
||||||
State = State1#ch{queue_states = QueueStates,
|
State = State1#ch{queue_states = QueueStates,
|
||||||
queue_consumers = maps:remove(QRef, QCons),
|
queue_consumers = maps:remove(QRef, QCons),
|
||||||
unacked_message_q = Rem},
|
unacked_message_q = Rem},
|
||||||
noreply(init_queue_cleanup_timer(State))
|
noreply(init_tick_timer(State))
|
||||||
end;
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
noreply(
|
noreply(
|
||||||
init_queue_cleanup_timer(
|
init_tick_timer(
|
||||||
State0#ch{queue_states = QueueStates1}))
|
State0#ch{queue_states = QueueStates1}))
|
||||||
end;
|
end;
|
||||||
handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) ->
|
handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) ->
|
||||||
|
@ -1910,10 +1911,7 @@ cancel_consumer(CTag, QName,
|
||||||
consumer_mapping = CMap}) ->
|
consumer_mapping = CMap}) ->
|
||||||
case rabbit_misc:table_lookup(
|
case rabbit_misc:table_lookup(
|
||||||
Capabilities, <<"consumer_cancel_notify">>) of
|
Capabilities, <<"consumer_cancel_notify">>) of
|
||||||
{bool, true} -> ok =
|
{bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag,
|
||||||
|
|
||||||
rabbit_log:info("Consumer cancel notify suppoerted ~w", [CTag]),
|
|
||||||
send(#'basic.cancel'{consumer_tag = CTag,
|
|
||||||
nowait = true}, State);
|
nowait = true}, State);
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end,
|
end,
|
||||||
|
@ -2692,9 +2690,9 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
|
||||||
State1 = track_delivering_queue(NoAck, QPid, QName, State),
|
State1 = track_delivering_queue(NoAck, QPid, QName, State),
|
||||||
{noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State1)}.
|
{noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State1)}.
|
||||||
|
|
||||||
init_queue_cleanup_timer(State) ->
|
init_tick_timer(State) ->
|
||||||
{ok, Interval} = application:get_env(rabbit, channel_queue_cleanup_interval),
|
{ok, Interval} = application:get_env(rabbit, channel_tick_interval),
|
||||||
State#ch{queue_cleanup_timer = erlang:send_after(Interval, self(), queue_cleanup)}.
|
State#ch{tick_timer = erlang:send_after(Interval, self(), tick)}.
|
||||||
|
|
||||||
%% only classic queues need monitoring so rather than special casing
|
%% only classic queues need monitoring so rather than special casing
|
||||||
%% everywhere monitors are set up we wrap it here for this module
|
%% everywhere monitors are set up we wrap it here for this module
|
||||||
|
|
|
@ -0,0 +1,272 @@
|
||||||
|
%% The contents of this file are subject to the Mozilla Public License
|
||||||
|
%% Version 1.1 (the "License"); you may not use this file except in
|
||||||
|
%% compliance with the License. You may obtain a copy of the License at
|
||||||
|
%% https://www.mozilla.org/MPL/
|
||||||
|
%%
|
||||||
|
%% Software distributed under the License is distributed on an "AS IS"
|
||||||
|
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
|
||||||
|
%% License for the specific language governing rights and limitations
|
||||||
|
%% under the License.
|
||||||
|
%%
|
||||||
|
%% The Original Code is RabbitMQ.
|
||||||
|
%%
|
||||||
|
%% The Initial Developer of the Original Code is GoPivotal, Inc.
|
||||||
|
%% Copyright (c) 2011-2019 Pivotal Software, Inc. All rights reserved.
|
||||||
|
%%
|
||||||
|
%%
|
||||||
|
-module(consumer_timeout_SUITE).
|
||||||
|
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("kernel/include/file.hrl").
|
||||||
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
|
||||||
|
-define(TIMEOUT, 30000).
|
||||||
|
|
||||||
|
-import(quorum_queue_utils, [wait_for_messages/2]).
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
[
|
||||||
|
{group, parallel_tests}
|
||||||
|
].
|
||||||
|
|
||||||
|
groups() ->
|
||||||
|
AllTests = [consumer_timeout,
|
||||||
|
consumer_timeout_basic_get,
|
||||||
|
consumer_timeout_no_basic_cancel_capability
|
||||||
|
],
|
||||||
|
[
|
||||||
|
{parallel_tests, [],
|
||||||
|
[
|
||||||
|
{classic_queue, [parallel], AllTests},
|
||||||
|
{mirrored_queue, [parallel], AllTests},
|
||||||
|
{quorum_queue, [parallel], AllTests}
|
||||||
|
]}
|
||||||
|
].
|
||||||
|
|
||||||
|
suite() ->
|
||||||
|
[
|
||||||
|
{timetrap, {minutes, 3}}
|
||||||
|
].
|
||||||
|
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%% Testsuite setup/teardown.
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
rabbit_ct_helpers:log_environment(),
|
||||||
|
rabbit_ct_helpers:run_setup_steps(Config).
|
||||||
|
|
||||||
|
end_per_suite(Config) ->
|
||||||
|
rabbit_ct_helpers:run_teardown_steps(Config).
|
||||||
|
|
||||||
|
init_per_group(classic_queue, Config) ->
|
||||||
|
rabbit_ct_helpers:set_config(
|
||||||
|
Config,
|
||||||
|
[{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
|
||||||
|
{queue_durable, true}]);
|
||||||
|
init_per_group(quorum_queue, Config) ->
|
||||||
|
case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of
|
||||||
|
ok ->
|
||||||
|
rabbit_ct_helpers:set_config(
|
||||||
|
Config,
|
||||||
|
[{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
|
||||||
|
{queue_durable, true}]);
|
||||||
|
Skip ->
|
||||||
|
Skip
|
||||||
|
end;
|
||||||
|
init_per_group(mirrored_queue, Config) ->
|
||||||
|
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
|
||||||
|
<<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
|
||||||
|
Config1 = rabbit_ct_helpers:set_config(
|
||||||
|
Config, [{is_mirrored, true},
|
||||||
|
{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
|
||||||
|
{queue_durable, true}]),
|
||||||
|
rabbit_ct_helpers:run_steps(Config1, []);
|
||||||
|
init_per_group(Group, Config0) ->
|
||||||
|
case lists:member({group, Group}, all()) of
|
||||||
|
true ->
|
||||||
|
ClusterSize = 2,
|
||||||
|
Config = rabbit_ct_helpers:merge_app_env(
|
||||||
|
Config0, {rabbit, [{channel_tick_interval, 1000},
|
||||||
|
{quorum_tick_interval, 1000},
|
||||||
|
{consumer_timeout, 5000}]}),
|
||||||
|
Config1 = rabbit_ct_helpers:set_config(
|
||||||
|
Config, [ {rmq_nodename_suffix, Group},
|
||||||
|
{rmq_nodes_count, ClusterSize}
|
||||||
|
]),
|
||||||
|
rabbit_ct_helpers:run_steps(Config1,
|
||||||
|
rabbit_ct_broker_helpers:setup_steps() ++
|
||||||
|
rabbit_ct_client_helpers:setup_steps());
|
||||||
|
false ->
|
||||||
|
rabbit_ct_helpers:run_steps(Config0, [])
|
||||||
|
end.
|
||||||
|
|
||||||
|
end_per_group(Group, Config) ->
|
||||||
|
case lists:member({group, Group}, all()) of
|
||||||
|
true ->
|
||||||
|
rabbit_ct_helpers:run_steps(Config,
|
||||||
|
rabbit_ct_client_helpers:teardown_steps() ++
|
||||||
|
rabbit_ct_broker_helpers:teardown_steps());
|
||||||
|
false ->
|
||||||
|
Config
|
||||||
|
end.
|
||||||
|
|
||||||
|
init_per_testcase(Testcase, Config) ->
|
||||||
|
Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
|
||||||
|
Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
|
||||||
|
Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])),
|
||||||
|
Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q},
|
||||||
|
{queue_name_2, Q2}]),
|
||||||
|
rabbit_ct_helpers:testcase_started(Config1, Testcase).
|
||||||
|
|
||||||
|
end_per_testcase(Testcase, Config) ->
|
||||||
|
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
||||||
|
amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}),
|
||||||
|
amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_2, Config)}),
|
||||||
|
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
||||||
|
|
||||||
|
consumer_timeout(Config) ->
|
||||||
|
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
||||||
|
QName = ?config(queue_name, Config),
|
||||||
|
declare_queue(Ch, Config, QName),
|
||||||
|
publish(Ch, QName, [<<"msg1">>]),
|
||||||
|
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
|
||||||
|
subscribe(Ch, QName, false),
|
||||||
|
receive
|
||||||
|
{#'basic.deliver'{delivery_tag = _,
|
||||||
|
redelivered = false}, _} ->
|
||||||
|
%% do nothing with the delivery should trigger timeout
|
||||||
|
receive
|
||||||
|
#'basic.cancel'{ } ->
|
||||||
|
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
|
||||||
|
ok
|
||||||
|
after 20000 ->
|
||||||
|
flush(1),
|
||||||
|
exit(cancel_never_happened)
|
||||||
|
end
|
||||||
|
after 5000 ->
|
||||||
|
exit(deliver_timeout)
|
||||||
|
end,
|
||||||
|
rabbit_ct_client_helpers:close_channel(Ch),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
consumer_timeout_basic_get(Config) ->
|
||||||
|
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
||||||
|
QName = ?config(queue_name, Config),
|
||||||
|
declare_queue(Ch, Config, QName),
|
||||||
|
publish(Ch, QName, [<<"msg1">>]),
|
||||||
|
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
|
||||||
|
[_DelTag] = consume(Ch, QName, [<<"msg1">>]),
|
||||||
|
erlang:monitor(process, Conn),
|
||||||
|
erlang:monitor(process, Ch),
|
||||||
|
receive
|
||||||
|
{'DOWN', _, process, Ch, _} -> ok
|
||||||
|
after 30000 ->
|
||||||
|
flush(1),
|
||||||
|
exit(channel_exit_expected)
|
||||||
|
end,
|
||||||
|
receive
|
||||||
|
{'DOWN', _, process, Conn, _} ->
|
||||||
|
flush(1),
|
||||||
|
exit(unexpected_connection_exit)
|
||||||
|
after 2000 ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
ok.
|
||||||
|
|
||||||
|
|
||||||
|
-define(CLIENT_CAPABILITIES,
|
||||||
|
[{<<"publisher_confirms">>, bool, true},
|
||||||
|
{<<"exchange_exchange_bindings">>, bool, true},
|
||||||
|
{<<"basic.nack">>, bool, true},
|
||||||
|
{<<"consumer_cancel_notify">>, bool, false},
|
||||||
|
{<<"connection.blocked">>, bool, true},
|
||||||
|
{<<"authentication_failure_close">>, bool, true}]).
|
||||||
|
|
||||||
|
consumer_timeout_no_basic_cancel_capability(Config) ->
|
||||||
|
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
|
||||||
|
Props = [{<<"capabilities">>, table, ?CLIENT_CAPABILITIES}],
|
||||||
|
AmqpParams = #amqp_params_network{port = Port,
|
||||||
|
host = "localhost",
|
||||||
|
virtual_host = <<"/">>,
|
||||||
|
client_properties = Props
|
||||||
|
},
|
||||||
|
{ok, Conn} = amqp_connection:start(AmqpParams),
|
||||||
|
{ok, Ch} = amqp_connection:open_channel(Conn),
|
||||||
|
QName = ?config(queue_name, Config),
|
||||||
|
declare_queue(Ch, Config, QName),
|
||||||
|
publish(Ch, QName, [<<"msg1">>]),
|
||||||
|
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
|
||||||
|
erlang:monitor(process, Conn),
|
||||||
|
erlang:monitor(process, Ch),
|
||||||
|
subscribe(Ch, QName, false),
|
||||||
|
receive
|
||||||
|
{#'basic.deliver'{delivery_tag = _,
|
||||||
|
redelivered = false}, _} ->
|
||||||
|
%% do nothing with the delivery should trigger timeout
|
||||||
|
ok
|
||||||
|
after 5000 ->
|
||||||
|
exit(deliver_timeout)
|
||||||
|
end,
|
||||||
|
receive
|
||||||
|
{'DOWN', _, process, Ch, _} -> ok
|
||||||
|
after 30000 ->
|
||||||
|
flush(1),
|
||||||
|
exit(channel_exit_expected)
|
||||||
|
end,
|
||||||
|
receive
|
||||||
|
{'DOWN', _, process, Conn, _} ->
|
||||||
|
flush(1),
|
||||||
|
exit(unexpected_connection_exit)
|
||||||
|
after 2000 ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
ok.
|
||||||
|
%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
%% Test helpers
|
||||||
|
%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
|
declare_queue(Ch, Config, QName) ->
|
||||||
|
Args = ?config(queue_args, Config),
|
||||||
|
Durable = ?config(queue_durable, Config),
|
||||||
|
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName,
|
||||||
|
arguments = Args,
|
||||||
|
durable = Durable}).
|
||||||
|
publish(Ch, QName, Payloads) ->
|
||||||
|
[amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload})
|
||||||
|
|| Payload <- Payloads].
|
||||||
|
|
||||||
|
consume(Ch, QName, Payloads) ->
|
||||||
|
consume(Ch, QName, false, Payloads).
|
||||||
|
|
||||||
|
consume(Ch, QName, NoAck, Payloads) ->
|
||||||
|
[begin
|
||||||
|
{#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} =
|
||||||
|
amqp_channel:call(Ch, #'basic.get'{queue = QName,
|
||||||
|
no_ack = NoAck}),
|
||||||
|
DTag
|
||||||
|
end || Payload <- Payloads].
|
||||||
|
|
||||||
|
subscribe(Ch, Queue, NoAck) ->
|
||||||
|
subscribe(Ch, Queue, NoAck, <<"ctag">>).
|
||||||
|
|
||||||
|
subscribe(Ch, Queue, NoAck, Ctag) ->
|
||||||
|
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
|
||||||
|
no_ack = NoAck,
|
||||||
|
consumer_tag = Ctag},
|
||||||
|
self()),
|
||||||
|
receive
|
||||||
|
#'basic.consume_ok'{consumer_tag = Ctag} ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
flush(T) ->
|
||||||
|
receive X ->
|
||||||
|
ct:pal("flushed ~w", [X]),
|
||||||
|
flush(T)
|
||||||
|
after T ->
|
||||||
|
ok
|
||||||
|
end.
|
|
@ -52,8 +52,6 @@ groups() ->
|
||||||
consume_and_nack,
|
consume_and_nack,
|
||||||
consume_and_requeue_multiple_nack,
|
consume_and_requeue_multiple_nack,
|
||||||
consume_and_multiple_nack,
|
consume_and_multiple_nack,
|
||||||
consumer_timeout,
|
|
||||||
consumer_timeout_basic_get,
|
|
||||||
basic_cancel,
|
basic_cancel,
|
||||||
purge,
|
purge,
|
||||||
basic_recover,
|
basic_recover,
|
||||||
|
@ -136,9 +134,8 @@ init_per_group(Group, Config0) ->
|
||||||
true ->
|
true ->
|
||||||
ClusterSize = 2,
|
ClusterSize = 2,
|
||||||
Config = rabbit_ct_helpers:merge_app_env(
|
Config = rabbit_ct_helpers:merge_app_env(
|
||||||
Config0, {rabbit, [{channel_queue_cleanup_interval, 1000},
|
Config0, {rabbit, [{channel_tick_interval, 1000},
|
||||||
{quorum_tick_interval, 1000},
|
{quorum_tick_interval, 1000}]}),
|
||||||
{consumer_timeout, 15000}]}),
|
|
||||||
Config1 = rabbit_ct_helpers:set_config(
|
Config1 = rabbit_ct_helpers:set_config(
|
||||||
Config, [ {rmq_nodename_suffix, Group},
|
Config, [ {rmq_nodename_suffix, Group},
|
||||||
{rmq_nodes_count, ClusterSize}
|
{rmq_nodes_count, ClusterSize}
|
||||||
|
@ -425,55 +422,6 @@ consume_and_multiple_nack(Config) ->
|
||||||
rabbit_ct_client_helpers:close_channel(Ch),
|
rabbit_ct_client_helpers:close_channel(Ch),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
consumer_timeout(Config) ->
|
|
||||||
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
|
||||||
QName = ?config(queue_name, Config),
|
|
||||||
declare_queue(Ch, Config, QName),
|
|
||||||
publish(Ch, QName, [<<"msg1">>]),
|
|
||||||
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
|
|
||||||
subscribe(Ch, QName, false),
|
|
||||||
receive
|
|
||||||
{#'basic.deliver'{delivery_tag = _,
|
|
||||||
redelivered = false}, _} ->
|
|
||||||
%% do nothing with the delivery should trigger timeout
|
|
||||||
receive
|
|
||||||
#'basic.cancel'{ } ->
|
|
||||||
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
|
|
||||||
ok
|
|
||||||
after 30000 ->
|
|
||||||
flush(1),
|
|
||||||
exit(cancel_never_happened)
|
|
||||||
end
|
|
||||||
after 5000 ->
|
|
||||||
exit(deliver_timeout)
|
|
||||||
end,
|
|
||||||
rabbit_ct_client_helpers:close_channel(Ch),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
consumer_timeout_basic_get(Config) ->
|
|
||||||
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
|
||||||
QName = ?config(queue_name, Config),
|
|
||||||
declare_queue(Ch, Config, QName),
|
|
||||||
publish(Ch, QName, [<<"msg1">>]),
|
|
||||||
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
|
|
||||||
[_DelTag] = consume(Ch, QName, [<<"msg1">>]),
|
|
||||||
erlang:monitor(process, Conn),
|
|
||||||
erlang:monitor(process, Ch),
|
|
||||||
receive
|
|
||||||
{'DOWN', _, process, Ch, _} -> ok
|
|
||||||
after 30000 ->
|
|
||||||
flush(1),
|
|
||||||
exit(channel_exit_expected)
|
|
||||||
end,
|
|
||||||
receive
|
|
||||||
{'DOWN', _, process, Conn, _} ->
|
|
||||||
flush(1),
|
|
||||||
exit(unexpected_connection_exit)
|
|
||||||
after 2000 ->
|
|
||||||
ok
|
|
||||||
end,
|
|
||||||
ok.
|
|
||||||
|
|
||||||
subscribe_and_requeue_nack(Config) ->
|
subscribe_and_requeue_nack(Config) ->
|
||||||
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
||||||
QName = ?config(queue_name, Config),
|
QName = ?config(queue_name, Config),
|
||||||
|
|
|
@ -172,7 +172,7 @@ init_per_group(Group, Config) ->
|
||||||
ok ->
|
ok ->
|
||||||
ok = rabbit_ct_broker_helpers:rpc(
|
ok = rabbit_ct_broker_helpers:rpc(
|
||||||
Config2, 0, application, set_env,
|
Config2, 0, application, set_env,
|
||||||
[rabbit, channel_queue_cleanup_interval, 100]),
|
[rabbit, channel_tick_interval, 100]),
|
||||||
%% HACK: the larger cluster sizes benefit for a bit more time
|
%% HACK: the larger cluster sizes benefit for a bit more time
|
||||||
%% after clustering before running the tests.
|
%% after clustering before running the tests.
|
||||||
case Group of
|
case Group of
|
||||||
|
|
Loading…
Reference in New Issue