diff --git a/Makefile b/Makefile index 6620c8359b..9e818a8937 100644 --- a/Makefile +++ b/Makefile @@ -126,7 +126,8 @@ define PROJECT_ENV {vhost_restart_strategy, continue}, %% {global, prefetch count} {default_consumer_prefetch, {false, 0}}, - {channel_queue_cleanup_interval, 60000}, + %% interval at which the channel can perform periodic actions + {channel_tick_interval, 60000}, %% Default max message size is 128 MB {max_message_size, 134217728} ] diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a4d3ec6321..b705dbcbdc 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -169,7 +169,7 @@ delivery_flow, interceptor_state, queue_states, - queue_cleanup_timer + tick_timer }). -define(QUEUE, lqueue). @@ -489,7 +489,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, end, MaxMessageSize = get_max_message_size(), ConsumerTimeout = get_consumer_timeout(), - rabbit_log:info("consumer timeout ~w", [ConsumerTimeout]), State = #ch{cfg = #conf{state = starting, protocol = Protocol, channel = Channel, @@ -535,7 +534,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, rabbit_event:if_enabled(State2, #ch.stats_timer, fun() -> emit_stats(State2) end), put_operation_timeout(), - State3 = init_queue_cleanup_timer(State2), + State3 = init_tick_timer(State2), {ok, State3, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -828,12 +827,13 @@ handle_info({{Ref, Node}, LateAnswer}, [Channel, LateAnswer, Node]), noreply(State); -handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel, - consumer_timeout = Timeout}, - queue_states = QueueStates0, - queue_names = QNames, - queue_consumers = QCons, - unacked_message_q = UAMQ}) -> +handle_info(tick, State0 = #ch{cfg = #conf{channel = Channel, + capabilities = Capabilities, + consumer_timeout = Timeout}, + queue_states = QueueStates0, + queue_names = QNames, + queue_consumers = QCons, + unacked_message_q = UAMQ}) -> QueueStates1 = maps:filter(fun(_, QS) -> QName = rabbit_quorum_queue:queue_name(QS), @@ -845,22 +845,24 @@ handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel, {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}} when is_integer(Timeout) andalso Time < Now - Timeout -> - case ConsumerTag of - _ when is_integer(ConsumerTag) -> - %% basic.get - there is no mechanims so we just crash the - %% channel + rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out " + "waiting on ack", + [rabbit_data_coercion:to_binary(ConsumerTag), + Channel]), + SupportsCancel = case rabbit_misc:table_lookup( + Capabilities, + <<"consumer_cancel_notify">>) of + {bool, true} when is_binary(ConsumerTag) -> + true; + _ -> false + end, + case SupportsCancel of + false -> Ex = rabbit_misc:amqp_error(precondition_failed, - "basic.get ack timed out on channel ~w", + "consumer ack timed out on channel ~w", [Channel], none), handle_exception(Ex, State0); - % rabbit_misc:protocol_error(precondition_failed, - % "basic.get ack timed out on channel ~w ", - % [Channel]); - _ -> - rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out " - "waiting on ack", - [rabbit_data_coercion:to_binary(ConsumerTag), - Channel]), + true -> QRef = qpid_to_ref(QPid), QName = maps:get(QRef, QNames), %% cancel the consumer with the client @@ -881,15 +883,14 @@ handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel, ?QUEUE:to_list(UAMQ)), QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds}, self(), QueueStates2), - State = State1#ch{queue_states = QueueStates, queue_consumers = maps:remove(QRef, QCons), unacked_message_q = Rem}, - noreply(init_queue_cleanup_timer(State)) + noreply(init_tick_timer(State)) end; _ -> noreply( - init_queue_cleanup_timer( + init_tick_timer( State0#ch{queue_states = QueueStates1})) end; handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) -> @@ -1910,10 +1911,7 @@ cancel_consumer(CTag, QName, consumer_mapping = CMap}) -> case rabbit_misc:table_lookup( Capabilities, <<"consumer_cancel_notify">>) of - {bool, true} -> ok = - - rabbit_log:info("Consumer cancel notify suppoerted ~w", [CTag]), - send(#'basic.cancel'{consumer_tag = CTag, + {bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag, nowait = true}, State); _ -> ok end, @@ -2692,9 +2690,9 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, State1 = track_delivering_queue(NoAck, QPid, QName, State), {noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State1)}. -init_queue_cleanup_timer(State) -> - {ok, Interval} = application:get_env(rabbit, channel_queue_cleanup_interval), - State#ch{queue_cleanup_timer = erlang:send_after(Interval, self(), queue_cleanup)}. +init_tick_timer(State) -> + {ok, Interval} = application:get_env(rabbit, channel_tick_interval), + State#ch{tick_timer = erlang:send_after(Interval, self(), tick)}. %% only classic queues need monitoring so rather than special casing %% everywhere monitors are set up we wrap it here for this module diff --git a/test/consumer_timeout_SUITE.erl b/test/consumer_timeout_SUITE.erl new file mode 100644 index 0000000000..8817b93c03 --- /dev/null +++ b/test/consumer_timeout_SUITE.erl @@ -0,0 +1,272 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2019 Pivotal Software, Inc. All rights reserved. +%% +%% +-module(consumer_timeout_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("kernel/include/file.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +-define(TIMEOUT, 30000). + +-import(quorum_queue_utils, [wait_for_messages/2]). + +all() -> + [ + {group, parallel_tests} + ]. + +groups() -> + AllTests = [consumer_timeout, + consumer_timeout_basic_get, + consumer_timeout_no_basic_cancel_capability + ], + [ + {parallel_tests, [], + [ + {classic_queue, [parallel], AllTests}, + {mirrored_queue, [parallel], AllTests}, + {quorum_queue, [parallel], AllTests} + ]} + ]. + +suite() -> + [ + {timetrap, {minutes, 3}} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(classic_queue, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, true}]); +init_per_group(quorum_queue, Config) -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of + ok -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); + Skip -> + Skip + end; +init_per_group(mirrored_queue, Config) -> + rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, + <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), + Config1 = rabbit_ct_helpers:set_config( + Config, [{is_mirrored, true}, + {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, true}]), + rabbit_ct_helpers:run_steps(Config1, []); +init_per_group(Group, Config0) -> + case lists:member({group, Group}, all()) of + true -> + ClusterSize = 2, + Config = rabbit_ct_helpers:merge_app_env( + Config0, {rabbit, [{channel_tick_interval, 1000}, + {quorum_tick_interval, 1000}, + {consumer_timeout, 5000}]}), + Config1 = rabbit_ct_helpers:set_config( + Config, [ {rmq_nodename_suffix, Group}, + {rmq_nodes_count, ClusterSize} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); + false -> + rabbit_ct_helpers:run_steps(Config0, []) + end. + +end_per_group(Group, Config) -> + case lists:member({group, Group}, all()) of + true -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()); + false -> + Config + end. + +init_per_testcase(Testcase, Config) -> + Group = proplists:get_value(name, ?config(tc_group_properties, Config)), + Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])), + Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])), + Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q}, + {queue_name_2, Q2}]), + rabbit_ct_helpers:testcase_started(Config1, Testcase). + +end_per_testcase(Testcase, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_2, Config)}), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +consumer_timeout(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = _, + redelivered = false}, _} -> + %% do nothing with the delivery should trigger timeout + receive + #'basic.cancel'{ } -> + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + ok + after 20000 -> + flush(1), + exit(cancel_never_happened) + end + after 5000 -> + exit(deliver_timeout) + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. + +consumer_timeout_basic_get(Config) -> + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [_DelTag] = consume(Ch, QName, [<<"msg1">>]), + erlang:monitor(process, Conn), + erlang:monitor(process, Ch), + receive + {'DOWN', _, process, Ch, _} -> ok + after 30000 -> + flush(1), + exit(channel_exit_expected) + end, + receive + {'DOWN', _, process, Conn, _} -> + flush(1), + exit(unexpected_connection_exit) + after 2000 -> + ok + end, + ok. + + +-define(CLIENT_CAPABILITIES, + [{<<"publisher_confirms">>, bool, true}, + {<<"exchange_exchange_bindings">>, bool, true}, + {<<"basic.nack">>, bool, true}, + {<<"consumer_cancel_notify">>, bool, false}, + {<<"connection.blocked">>, bool, true}, + {<<"authentication_failure_close">>, bool, true}]). + +consumer_timeout_no_basic_cancel_capability(Config) -> + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + Props = [{<<"capabilities">>, table, ?CLIENT_CAPABILITIES}], + AmqpParams = #amqp_params_network{port = Port, + host = "localhost", + virtual_host = <<"/">>, + client_properties = Props + }, + {ok, Conn} = amqp_connection:start(AmqpParams), + {ok, Ch} = amqp_connection:open_channel(Conn), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + erlang:monitor(process, Conn), + erlang:monitor(process, Ch), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = _, + redelivered = false}, _} -> + %% do nothing with the delivery should trigger timeout + ok + after 5000 -> + exit(deliver_timeout) + end, + receive + {'DOWN', _, process, Ch, _} -> ok + after 30000 -> + flush(1), + exit(channel_exit_expected) + end, + receive + {'DOWN', _, process, Conn, _} -> + flush(1), + exit(unexpected_connection_exit) + after 2000 -> + ok + end, + ok. +%%%%%%%%%%%%%%%%%%%%%%%% +%% Test helpers +%%%%%%%%%%%%%%%%%%%%%%%% + +declare_queue(Ch, Config, QName) -> + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, + arguments = Args, + durable = Durable}). +publish(Ch, QName, Payloads) -> + [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload}) + || Payload <- Payloads]. + +consume(Ch, QName, Payloads) -> + consume(Ch, QName, false, Payloads). + +consume(Ch, QName, NoAck, Payloads) -> + [begin + {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} = + amqp_channel:call(Ch, #'basic.get'{queue = QName, + no_ack = NoAck}), + DTag + end || Payload <- Payloads]. + +subscribe(Ch, Queue, NoAck) -> + subscribe(Ch, Queue, NoAck, <<"ctag">>). + +subscribe(Ch, Queue, NoAck, Ctag) -> + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, + no_ack = NoAck, + consumer_tag = Ctag}, + self()), + receive + #'basic.consume_ok'{consumer_tag = Ctag} -> + ok + end. + +flush(T) -> + receive X -> + ct:pal("flushed ~w", [X]), + flush(T) + after T -> + ok + end. diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl index 00d597c75a..632a314d21 100644 --- a/test/queue_parallel_SUITE.erl +++ b/test/queue_parallel_SUITE.erl @@ -52,8 +52,6 @@ groups() -> consume_and_nack, consume_and_requeue_multiple_nack, consume_and_multiple_nack, - consumer_timeout, - consumer_timeout_basic_get, basic_cancel, purge, basic_recover, @@ -136,9 +134,8 @@ init_per_group(Group, Config0) -> true -> ClusterSize = 2, Config = rabbit_ct_helpers:merge_app_env( - Config0, {rabbit, [{channel_queue_cleanup_interval, 1000}, - {quorum_tick_interval, 1000}, - {consumer_timeout, 15000}]}), + Config0, {rabbit, [{channel_tick_interval, 1000}, + {quorum_tick_interval, 1000}]}), Config1 = rabbit_ct_helpers:set_config( Config, [ {rmq_nodename_suffix, Group}, {rmq_nodes_count, ClusterSize} @@ -425,55 +422,6 @@ consume_and_multiple_nack(Config) -> rabbit_ct_client_helpers:close_channel(Ch), ok. -consumer_timeout(Config) -> - {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - declare_queue(Ch, Config, QName), - publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - subscribe(Ch, QName, false), - receive - {#'basic.deliver'{delivery_tag = _, - redelivered = false}, _} -> - %% do nothing with the delivery should trigger timeout - receive - #'basic.cancel'{ } -> - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - ok - after 30000 -> - flush(1), - exit(cancel_never_happened) - end - after 5000 -> - exit(deliver_timeout) - end, - rabbit_ct_client_helpers:close_channel(Ch), - ok. - -consumer_timeout_basic_get(Config) -> - {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - declare_queue(Ch, Config, QName), - publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - [_DelTag] = consume(Ch, QName, [<<"msg1">>]), - erlang:monitor(process, Conn), - erlang:monitor(process, Ch), - receive - {'DOWN', _, process, Ch, _} -> ok - after 30000 -> - flush(1), - exit(channel_exit_expected) - end, - receive - {'DOWN', _, process, Conn, _} -> - flush(1), - exit(unexpected_connection_exit) - after 2000 -> - ok - end, - ok. - subscribe_and_requeue_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), QName = ?config(queue_name, Config), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 61f9328855..c23b7ac85e 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -172,7 +172,7 @@ init_per_group(Group, Config) -> ok -> ok = rabbit_ct_broker_helpers:rpc( Config2, 0, application, set_env, - [rabbit, channel_queue_cleanup_interval, 100]), + [rabbit, channel_tick_interval, 100]), %% HACK: the larger cluster sizes benefit for a bit more time %% after clustering before running the tests. case Group of