Merge remote-tracking branch 'origin/master' into qq-testing

This commit is contained in:
Diana Corbacho 2019-01-10 08:59:41 +00:00
commit f1f2cac568
25 changed files with 2425 additions and 392 deletions

View File

@ -1,5 +1,6 @@
# vim:sw=2:et:
dist: xenial
sudo: false
language: erlang
notifications:
@ -10,15 +11,8 @@ notifications:
on_failure: always
addons:
apt:
sources:
- sourceline: deb https://packages.erlang-solutions.com/ubuntu trusty contrib
key_url: https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc
packages:
- awscli
# Use Elixir from Erlang Solutions. The provided Elixir is
# installed with kiex but is old. We also can't use kiex to
# install a newer one because of GitHub API rate limiting.
- elixir=1.6.0-1
cache:
apt: true
env:
@ -27,10 +21,10 @@ env:
- secure: L1t0CHGR4RzOXwtkpM6feRKax95rszScBLqzjstEiMPkhjTsYTlAecnNxx6lTrGMnk5hQoi4PtbhmyZOX0siHTngTogoA/Nyn8etYzicU5ZO+qmBQOYpegz51lEu70ewXgkhEHzk9DtEPxfYviH9WiILrdUVRXXgZpoXq13p1QA=
otp_release:
- "19.3"
- "20.3"
- "21.2"
before_script:
- elixir --version
# The checkout made by Travis is a "detached HEAD" and branches
# information is missing. Our Erlang.mk's git_rmq fetch method relies
# on it, so we need to restore it.
@ -42,11 +36,6 @@ before_script:
git remote add upstream https://github.com/$TRAVIS_REPO_SLUG.git
git fetch upstream v3.8.x:v3.8.x || :
git fetch upstream master:master || :
# Make sure we use Elixir from Erlang Solutions and not kiex.
- |
echo YES | kiex implode
elixir --version
elixir --version | grep -q 'Elixir 1.6.0'
script:
- make xref

View File

@ -130,13 +130,15 @@ define PROJECT_ENV
{vhost_restart_strategy, continue},
%% {global, prefetch count}
{default_consumer_prefetch, {false, 0}},
{channel_queue_cleanup_interval, 60000}
{channel_queue_cleanup_interval, 60000},
%% Default max message size is 128 MB
{max_message_size, 134217728}
]
endef
LOCAL_DEPS = sasl mnesia os_mon inets
BUILD_DEPS = rabbitmq_cli syslog
DEPS = ranch lager rabbit_common ra
DEPS = ranch lager rabbit_common ra sysmon_handler
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck proper
dep_syslog = git https://github.com/schlagert/syslog 3.4.5
@ -186,13 +188,28 @@ tests:: bats
SLOW_CT_SUITES := backing_queue \
cluster_rename \
clustering_management \
config_schema \
dynamic_ha \
eager_sync \
health_check \
lazy_queue \
metrics \
msg_store \
partitions \
per_user_connection_tracking \
per_vhost_connection_limit \
per_vhost_msg_store \
per_vhost_queue_limit \
policy \
priority_queue \
queue_master_location \
simple_ha
quorum_queue \
rabbit_core_metrics_gc \
rabbit_fifo_prop \
simple_ha \
sync_detection \
unit_inbroker_parallel \
vhost
FAST_CT_SUITES := $(filter-out $(sort $(SLOW_CT_SUITES)),$(CT_SUITES))
ct-fast: CT_SUITES = $(FAST_CT_SUITES)

View File

@ -470,7 +470,7 @@
## Disabling background GC may reduce latency for client operations,
## keeping it enabled may reduce median RAM usage by the binary heap
## (see https://www.erlang-solutions.com/blog/erlang-garbage-collector.html).
##
##
## Before trying this option, please take a look at the memory
## breakdown (http://www.rabbitmq.com/memory-use.html).
##
@ -533,18 +533,49 @@
##
# management.http_log_dir = /path/to/access.log
## Change the port on which the HTTP listener listens,
## specifying an interface for the web server to bind to.
## Also set the listener to use TLS and provide TLS options.
## HTTP listener and embedded Web server settings.
# ## See https://rabbitmq.com/management.html for details.
#
# management.tcp.port = 15672
# management.tcp.ip = 0.0.0.0
#
# management.tcp.shutdown_timeout = 7000
# management.tcp.max_keepalive = 120
# management.tcp.idle_timeout = 120
# management.tcp.inactivity_timeout = 120
# management.tcp.request_timeout = 120
# management.tcp.compress = true
## HTTPS listener settings.
## See https://rabbitmq.com/management.html and https://rabbitmq.com/ssl.html for details.
##
# management.ssl.port = 15671
# management.ssl.cacertfile = /path/to/ca_certificate.pem
# management.ssl.certfile = /path/to/server_certificate.pem
# management.ssl.keyfile = /path/to/server_key.pem
## More TLS options
# management.ssl.honor_cipher_order = true
# management.ssl.honor_ecc_order = true
# management.ssl.client_renegotiation = false
# management.ssl.secure_renegotiate = true
## Supported TLS versions
# management.ssl.versions.1 = tlsv1.2
# management.ssl.versions.2 = tlsv1.1
## Cipher suites the server is allowed to use
# management.ssl.ciphers.1 = ECDHE-ECDSA-AES256-GCM-SHA384
# management.ssl.ciphers.2 = ECDHE-RSA-AES256-GCM-SHA384
# management.ssl.ciphers.3 = ECDHE-ECDSA-AES256-SHA384
# management.ssl.ciphers.4 = ECDHE-RSA-AES256-SHA384
# management.ssl.ciphers.5 = ECDH-ECDSA-AES256-GCM-SHA384
# management.ssl.ciphers.6 = ECDH-RSA-AES256-GCM-SHA384
# management.ssl.ciphers.7 = ECDH-ECDSA-AES256-SHA384
# management.ssl.ciphers.8 = ECDH-RSA-AES256-SHA384
# management.ssl.ciphers.9 = DHE-RSA-AES256-GCM-SHA384
# management.listener.port = 15672
# management.listener.ip = 127.0.0.1
# management.listener.ssl = true
# management.listener.ssl_opts.cacertfile = /path/to/cacert.pem
# management.listener.ssl_opts.certfile = /path/to/cert.pem
# management.listener.ssl_opts.keyfile = /path/to/key.pem
## One of 'basic', 'detailed' or 'none'. See
## http://rabbitmq.com/management.html#fine-stats for more details.
@ -583,13 +614,39 @@
# STOMP section
# =======================================
## Network Configuration. The format is generally the same as for the core broker.
##
# stomp.listeners.tcp.default = 61613
## See https://rabbitmq.com/stomp.html for details.
## Same for ssl listeners
## TCP listeners.
##
# stomp.listeners.tcp.1 = 127.0.0.1:61613
# stomp.listeners.tcp.2 = ::1:61613
## TCP listener settings
##
# stomp.tcp_listen_options.backlog = 2048
# stomp.tcp_listen_options.recbuf = 131072
# stomp.tcp_listen_options.sndbuf = 131072
#
# stomp.tcp_listen_options.keepalive = true
# stomp.tcp_listen_options.nodelay = true
#
# stomp.tcp_listen_options.exit_on_close = true
# stomp.tcp_listen_options.send_timeout = 120
## Proxy protocol support
##
# stomp.proxy_protocol = false
## TLS listeners
## See https://rabbitmq.com/stomp.html and https://rabbitmq.com/ssl.html for details.
# stomp.listeners.ssl.default = 61614
#
# ssl_options.cacertfile = path/to/cacert.pem
# ssl_options.certfile = path/to/cert.pem
# ssl_options.keyfile = path/to/key.pem
# ssl_options.verify = verify_peer
# ssl_options.fail_if_no_peer_cert = true
## Number of Erlang processes that will accept connections for the TCP
## and TLS listeners.
@ -642,6 +699,52 @@
# MQTT section
# =======================================
## TCP listener settings.
##
# mqtt.listeners.tcp.1 = 127.0.0.1:61613
# mqtt.listeners.tcp.2 = ::1:61613
## TCP listener options (as per the broker configuration).
##
# mqtt.tcp_listen_options.backlog = 4096
# mqtt.tcp_listen_options.recbuf = 131072
# mqtt.tcp_listen_options.sndbuf = 131072
#
# mqtt.tcp_listen_options.keepalive = true
# mqtt.tcp_listen_options.nodelay = true
#
# mqtt.tcp_listen_options.exit_on_close = true
# mqtt.tcp_listen_options.send_timeout = 120
## TLS listener settings
## ## See https://rabbitmq.com/mqtt.html and https://rabbitmq.com/ssl.html for details.
#
# mqtt.listeners.ssl.default = 8883
#
# ssl_options.cacertfile = /path/to/tls/ca_certificate_bundle.pem
# ssl_options.certfile = /path/to/tls/server_certificate.pem
# ssl_options.keyfile = /path/to/tls/server_key.pem
# ssl_options.verify = verify_peer
# ssl_options.fail_if_no_peer_cert = true
#
## Number of Erlang processes that will accept connections for the TCP
## and TLS listeners.
##
# mqtt.num_acceptors.tcp = 10
# mqtt.num_acceptors.ssl = 10
## Whether or not to enable proxy protocol support.
## Once enabled, clients cannot directly connect to the broker
## anymore. They must connect through a load balancer that sends the
## proxy protocol header to the broker at connection time.
## This setting applies only to STOMP clients, other protocols
## like STOMP or AMQP have their own setting to enable proxy protocol.
## See the plugins or broker documentation for more information.
##
# mqtt.proxy_protocol = false
## Set the default user name and password used for anonymous connections (when client
## provides no credentials). Anonymous connections are highly discouraged!
##
@ -672,34 +775,6 @@
##
# mqtt.prefetch = 10
## TCP/SSL Configuration (as per the broker configuration).
##
# mqtt.listeners.tcp.default = 1883
## Same for ssl listener
##
# mqtt.listeners.ssl.default = 1884
## Number of Erlang processes that will accept connections for the TCP
## and TLS listeners.
##
# mqtt.num_acceptors.tcp = 10
# mqtt.num_acceptors.ssl = 10
## TCP listener options (as per the broker configuration).
##
# mqtt.tcp_listen_options.backlog = 128
# mqtt.tcp_listen_options.nodelay = true
## Whether or not to enable proxy protocol support.
## Once enabled, clients cannot directly connect to the broker
## anymore. They must connect through a load balancer that sends the
## proxy protocol header to the broker at connection time.
## This setting applies only to STOMP clients, other protocols
## like STOMP or AMQP have their own setting to enable proxy protocol.
## See the plugins or broker documentation for more information.
##
# mqtt.proxy_protocol = false
## ----------------------------------------------------------------------------
## RabbitMQ AMQP 1.0 Support

View File

@ -258,7 +258,7 @@ end}.
{translation, "rabbit.ssl_options.ciphers",
fun(Conf) ->
Settings = cuttlefish_variable:filter_by_prefix("ssl_options.ciphers", Conf),
[V || {_, V} <- Settings]
lists:reverse([V || {_, V} <- Settings])
end}.
%% ===========================================================================
@ -554,6 +554,9 @@ end}.
}.
{mapping, "msx_message_size", "rabbit.max_message_size",
[{datatype, integer}, {validators, ["less_then_512MB"]}]}.
%% Customising Socket Options.
%%
%% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for
@ -1352,6 +1355,103 @@ end}.
{validators, ["non_zero_positive_integer"]}
]}.
% ==========================
% sysmon_handler section
% ==========================
%% @doc The threshold at which to warn about the number of processes
%% that are overly busy. Processes with large heaps or that take a
%% long time to garbage collect will count toward this threshold.
{mapping, "sysmon_handler.thresholds.busy_processes", "sysmon_handler.process_limit", [
{default, 30},
{datatype, integer},
hidden
]}.
%% @doc The threshold at which to warn about the number of ports that
%% are overly busy. Ports with full input buffers count toward this
%% threshold.
{mapping, "sysmon_handler.thresholds.busy_ports", "sysmon_handler.port_limit", [
{default, 2},
{datatype, integer},
hidden
]}.
%% @doc A process will become busy when it exceeds this amount of time
%% doing garbage collection.
%%
%% NOTE: Enabling this setting can cause performance problems on
%% multi-core systems.
%% @see sysmon_handler.thresholds.busy_processes
{mapping, "sysmon_handler.triggers.process.garbage_collection", "sysmon_handler.gc_ms_limit", [
{default, off},
{datatype, [{atom, off},
{duration, ms}]},
hidden
]}.
{translation, "sysmon_handler.gc_ms_limit",
fun(Conf) ->
case cuttlefish:conf_get("sysmon_handler.triggers.process.garbage_collection", Conf) of
off -> 0;
Int -> Int
end
end}.
%% @doc A process will become busy when it exceeds this amount of time
%% during a single process scheduling & execution cycle.
{mapping, "sysmon_handler.triggers.process.long_scheduled_execution", "sysmon_handler.schedule_ms_limit", [
{default, off},
{datatype, [{atom, off},
{duration, ms}]},
hidden
]}.
{translation, "sysmon_handler.schedule_ms_limit",
fun(Conf) ->
case cuttlefish:conf_get("sysmon_handler.triggers.process.long_scheduled_execution", Conf) of
off -> 0;
Int -> Int
end
end}.
%% @doc A process will become busy when its heap exceeds this size.
%% @see sysmon_handler.thresholds.busy_processes
{mapping, "sysmon_handler.triggers.process.heap_size", "sysmon_handler.heap_word_limit", [
{default, "160444000"},
{datatype, [bytesize, {atom, off}]},
hidden
]}.
{translation, "sysmon_handler.heap_word_limit",
fun(Conf) ->
case cuttlefish:conf_get("sysmon_handler.triggers.process.heap_size", Conf) of
off -> 0;
Bytes ->
WordSize = erlang:system_info(wordsize),
Bytes div WordSize
end
end}.
%% @doc Whether ports with full input buffers will be counted as
%% busy. Ports can represent open files or network sockets.
%% @see sysmon_handler.thresholds.busy_ports
{mapping, "sysmon_handler.triggers.port", "sysmon_handler.busy_port", [
{default, on},
{datatype, flag},
hidden
]}.
%% @doc Whether distribution ports with full input buffers will be
%% counted as busy. Distribution ports connect Erlang nodes within a
%% single cluster.
%% @see sysmon_handler.thresholds.busy_ports
{mapping, "sysmon_handler.triggers.distribution_port", "sysmon_handler.busy_dist_port", [
{default, on},
{datatype, flag},
hidden
]}.
% ===============================
% Validators
% ===============================
@ -1361,6 +1461,11 @@ fun(Size) when is_integer(Size) ->
Size > 0 andalso Size < 2147483648
end}.
{validator, "less_then_512MB", "Max message size should be less than 512MB and gre than 0",
fun(Size) when is_integer(Size) ->
Size > 0 andalso Size < 536870912
end}.
{validator, "less_than_1", "Flooat is not beetween 0 and 1",
fun(Float) when is_float(Float) ->
Float > 0 andalso Float < 1

View File

@ -115,6 +115,7 @@ dep_lager = hex 3.6.5
dep_ra = git https://github.com/rabbitmq/ra.git master
dep_ranch = hex 1.7.1
dep_recon = hex 2.3.6
dep_sysmon_handler = hex 1.0.0
RABBITMQ_COMPONENTS = amqp_client \
amqp10_common \

View File

@ -156,6 +156,13 @@
{requires, kernel_ready},
{enables, core_initialized}]}).
-rabbit_boot_step({rabbit_sysmon_minder,
[{description, "sysmon_handler supervisor"},
{mfa, {rabbit_sup, start_restartable_child,
[rabbit_sysmon_minder]}},
{requires, kernel_ready},
{enables, core_initialized}]}).
-rabbit_boot_step({core_initialized,
[{description, "core initialized"},
{requires, kernel_ready}]}).
@ -225,7 +232,7 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
-define(APPS, [os_mon, mnesia, rabbit_common, ra, rabbit]).
-define(APPS, [os_mon, mnesia, rabbit_common, ra, sysmon_handler, rabbit]).
-define(ASYNC_THREADS_WARNING_THRESHOLD, 8).

View File

@ -29,7 +29,7 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
emit_info_all/5, list_local/1, info_local/1,
emit_info_local/4, emit_info_down/4]).
-export([list_down/1, count/1, list_names/0, list_local_names/0]).
-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0]).
-export([list_by_type/1]).
-export([notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
@ -437,8 +437,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
not_found -> Q1 = rabbit_policy:set(Q),
Q2 = Q1#amqqueue{state = live},
ok = store_queue(Q2),
B = add_default_binding(Q2),
fun () -> B(), {created, Q2} end;
fun () -> {created, Q2} end;
{absent, _Q, _} = R -> rabbit_misc:const(R)
end;
[ExistingQ] ->
@ -502,15 +501,6 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1},
%% mirroring-related has changed - the policy may have changed anyway.
notify_policy_changed(Q1).
add_default_binding(#amqqueue{name = QueueName}) ->
ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
rabbit_binding:add(#binding{source = ExchangeName,
destination = QueueName,
key = RoutingKey,
args = []},
?INTERNAL_USER).
lookup([]) -> []; %% optimisation
lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation
lookup(Names) when is_list(Names) ->
@ -666,6 +656,7 @@ declare_args() ->
{<<"x-max-priority">>, fun check_max_priority_arg/2},
{<<"x-overflow">>, fun check_overflow/2},
{<<"x-queue-mode">>, fun check_queue_mode/2},
{<<"x-single-active-consumer">>, fun check_single_active_consumer_arg/2},
{<<"x-queue-type">>, fun check_queue_type/2},
{<<"x-quorum-initial-group-size">>, fun check_default_quorum_initial_group_size_arg/2}].
@ -708,6 +699,12 @@ check_max_priority_arg({Type, Val}, Args) ->
Error -> Error
end.
check_single_active_consumer_arg({Type, Val}, Args) ->
case check_bool_arg({Type, Val}, Args) of
ok -> ok;
Error -> Error
end.
check_default_quorum_initial_group_size_arg({Type, Val}, Args) ->
case check_non_neg_int_arg({Type, Val}, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
@ -757,6 +754,8 @@ list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
list_names() -> mnesia:dirty_all_keys(rabbit_queue).
list_names(VHost) -> [Q#amqqueue.name || Q <- list(VHost)].
list_local_names() ->
[ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(),
State =/= crashed, is_local_to_node(QPid, node())].

View File

@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%% Copyright (c) 2007-2018 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_amqqueue_process).
@ -36,8 +36,8 @@
-record(q, {
%% an #amqqueue record
q,
%% none | {exclusive consumer channel PID, consumer tag}
exclusive_consumer,
%% none | {exclusive consumer channel PID, consumer tag} | {single active consumer channel PID, consumer}
active_consumer,
%% Set to true if a queue has ever had a consumer.
%% This is used to determine when to delete auto-delete queues.
has_had_consumers,
@ -94,7 +94,9 @@
%% example.
mirroring_policy_version = 0,
%% running | flow | idle
status
status,
%% true | false
single_active_consumer_on
}).
%%----------------------------------------------------------------------------
@ -155,15 +157,20 @@ init(Q) ->
?MODULE}.
init_state(Q) ->
State = #q{q = Q,
exclusive_consumer = none,
has_had_consumers = false,
consumers = rabbit_queue_consumers:new(),
senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
status = running,
args_policy_version = 0,
overflow = 'drop-head'},
SingleActiveConsumerOn = case rabbit_misc:table_lookup(Q#amqqueue.arguments, <<"x-single-active-consumer">>) of
{bool, true} -> true;
_ -> false
end,
State = #q{q = Q,
active_consumer = none,
has_had_consumers = false,
consumers = rabbit_queue_consumers:new(),
senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
status = running,
args_policy_version = 0,
overflow = 'drop-head',
single_active_consumer_on = SingleActiveConsumerOn},
rabbit_event:init_stats_timer(State, #q.stats_timer).
init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) ->
@ -545,7 +552,10 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref).
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
assert_invariant(State = #q{consumers = Consumers}) ->
assert_invariant(#q{single_active_consumer_on = true}) ->
%% queue may contain messages and have available consumers with exclusive consumer
ok;
assert_invariant(State = #q{consumers = Consumers, single_active_consumer_on = false}) ->
true = (rabbit_queue_consumers:inactive(Consumers) orelse is_empty(State)).
is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
@ -619,7 +629,8 @@ run_message_queue(ActiveConsumersChanged, State) ->
true -> maybe_notify_decorators(ActiveConsumersChanged, State);
false -> case rabbit_queue_consumers:deliver(
fun(AckRequired) -> fetch(AckRequired, State) end,
qname(State), State#q.consumers) of
qname(State), State#q.consumers,
State#q.single_active_consumer_on, State#q.active_consumer) of
{delivered, ActiveConsumersChanged1, State1, Consumers} ->
run_message_queue(
ActiveConsumersChanged or ActiveConsumersChanged1,
@ -645,7 +656,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
{{Message, Delivered, AckTag}, {BQS1, MTC}};
(false) -> {{Message, Delivered, undefined},
discard(Delivery, BQ, BQS, MTC)}
end, qname(State), State#q.consumers) of
end, qname(State), State#q.consumers, State#q.single_active_consumer_on, State#q.active_consumer) of
{delivered, ActiveConsumersChanged, {BQS1, MTC1}, Consumers} ->
{delivered, maybe_notify_decorators(
ActiveConsumersChanged,
@ -814,9 +825,10 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
handle_ch_down(DownPid, State = #q{consumers = Consumers,
exclusive_consumer = Holder,
senders = Senders}) ->
handle_ch_down(DownPid, State = #q{consumers = Consumers,
active_consumer = Holder,
single_active_consumer_on = SingleActiveConsumerOn,
senders = Senders}) ->
State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of
false ->
Senders;
@ -840,12 +852,9 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
{ChAckTags, ChCTags, Consumers1} ->
QName = qname(State1),
[emit_consumer_deleted(DownPid, CTag, QName, ?INTERNAL_USER) || CTag <- ChCTags],
Holder1 = case Holder of
{DownPid, _} -> none;
Other -> Other
end,
Holder1 = new_single_active_consumer_after_channel_down(DownPid, Holder, SingleActiveConsumerOn, Consumers1),
State2 = State1#q{consumers = Consumers1,
exclusive_consumer = Holder1},
active_consumer = Holder1},
notify_decorators(State2),
case should_auto_delete(State2) of
true ->
@ -860,6 +869,22 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
end
end.
new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = true, Consumers) ->
case CurrentSingleActiveConsumer of
{DownChPid, _} ->
case rabbit_queue_consumers:get_consumer(Consumers) of
undefined -> none;
Consumer -> Consumer
end;
false ->
CurrentSingleActiveConsumer
end;
new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = false, _Consumers) ->
case CurrentSingleActiveConsumer of
{DownChPid, _} -> none;
Other -> Other
end.
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
check_exclusive_access(none, false, _State) ->
@ -1007,14 +1032,14 @@ i(effective_policy_definition, #q{q = Q}) ->
undefined -> [];
Def -> Def
end;
i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
'';
i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
i(exclusive_consumer_pid, #q{active_consumer = {ChPid, _ConsumerTag}, single_active_consumer_on = false}) ->
ChPid;
i(exclusive_consumer_tag, #q{exclusive_consumer = none}) ->
i(exclusive_consumer_pid, _) ->
'';
i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
i(exclusive_consumer_tag, #q{active_consumer = {_ChPid, ConsumerTag}, single_active_consumer_on = false}) ->
ConsumerTag;
i(exclusive_consumer_tag, _) ->
'';
i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:len(BQS);
i(messages_unacknowledged, _) ->
@ -1213,49 +1238,81 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser},
_From, State = #q{consumers = Consumers,
exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use -> reply({error, exclusive_consume_unavailable}, State);
ok -> Consumers1 = rabbit_queue_consumers:add(
ChPid, ConsumerTag, NoAck,
LimiterPid, LimiterActive,
PrefetchCount, Args, is_empty(State),
ActingUser, Consumers),
ExclusiveConsumer =
if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> Holder
end,
State1 = State#q{consumers = Consumers1,
has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
QName = qname(State1),
AckRequired = not NoAck,
rabbit_core_metrics:consumer_created(
ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
PrefetchCount, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName, PrefetchCount,
Args, none, ActingUser),
notify_decorators(State1),
reply(ok, run_message_queue(State1))
_From, State = #q{consumers = Consumers,
active_consumer = Holder,
single_active_consumer_on = SingleActiveConsumerOn}) ->
ConsumerRegistration = case SingleActiveConsumerOn of
true ->
case ExclusiveConsume of
true ->
{error, reply({error, exclusive_consume_unavailable}, State)};
false ->
Consumers1 = rabbit_queue_consumers:add(
ChPid, ConsumerTag, NoAck,
LimiterPid, LimiterActive,
PrefetchCount, Args, is_empty(State),
ActingUser, Consumers),
case Holder of
none ->
NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1),
{state, State#q{consumers = Consumers1,
has_had_consumers = true,
active_consumer = NewConsumer}};
_ ->
{state, State#q{consumers = Consumers1,
has_had_consumers = true}}
end
end;
false ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use -> {error, reply({error, exclusive_consume_unavailable}, State)};
ok ->
Consumers1 = rabbit_queue_consumers:add(
ChPid, ConsumerTag, NoAck,
LimiterPid, LimiterActive,
PrefetchCount, Args, is_empty(State),
ActingUser, Consumers),
ExclusiveConsumer =
if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> Holder
end,
{state, State#q{consumers = Consumers1,
has_had_consumers = true,
active_consumer = ExclusiveConsumer}}
end
end,
case ConsumerRegistration of
{error, Reply} ->
Reply;
{state, State1} ->
ok = maybe_send_reply(ChPid, OkMsg),
QName = qname(State1),
AckRequired = not NoAck,
rabbit_core_metrics:consumer_created(
ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
PrefetchCount, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName, PrefetchCount,
Args, none, ActingUser),
notify_decorators(State1),
reply(ok, run_message_queue(State1))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, _From,
State = #q{consumers = Consumers,
exclusive_consumer = Holder}) ->
State = #q{consumers = Consumers,
active_consumer = Holder,
single_active_consumer_on = SingleActiveConsumerOn }) ->
ok = maybe_send_reply(ChPid, OkMsg),
case rabbit_queue_consumers:remove(ChPid, ConsumerTag, Consumers) of
not_found ->
reply(ok, State);
Consumers1 ->
Holder1 = case Holder of
{ChPid, ConsumerTag} -> none;
_ -> Holder
end,
Holder1 = new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag,
Holder, SingleActiveConsumerOn, Consumers1
),
State1 = State#q{consumers = Consumers1,
exclusive_consumer = Holder1},
active_consumer = Holder1},
emit_consumer_deleted(ChPid, ConsumerTag, qname(State1), ActingUser),
notify_decorators(State1),
case should_auto_delete(State1) of
@ -1325,6 +1382,24 @@ handle_call(sync_mirrors, _From, State) ->
handle_call(cancel_sync_mirrors, _From, State) ->
reply({ok, not_syncing}, State).
new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentSingleActiveConsumer,
_SingleActiveConsumerIsOn = true, Consumers) ->
case rabbit_queue_consumers:is_same(ChPid, ConsumerTag, CurrentSingleActiveConsumer) of
true ->
case rabbit_queue_consumers:get_consumer(Consumers) of
undefined -> none;
Consumer -> Consumer
end;
false ->
CurrentSingleActiveConsumer
end;
new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentSingleActiveConsumer,
_SingleActiveConsumerIsOn = false, _Consumers) ->
case CurrentSingleActiveConsumer of
{ChPid, ConsumerTag} -> none;
_ -> CurrentSingleActiveConsumer
end.
handle_cast(init, State) ->
try
init_it({no_barrier, non_clean_shutdown}, none, State)
@ -1432,7 +1507,6 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
{unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1},
run_message_queue(true, State1)
end);
handle_cast(notify_decorators, State) ->
notify_decorators(State),
noreply(State);

View File

@ -27,6 +27,10 @@
-export([has_for_source/1, remove_for_source/1,
remove_for_destination/2, remove_transient_for_destination/1]).
-define(DEFAULT_EXCHANGE(VHostPath), #resource{virtual_host = VHostPath,
kind = exchange,
name = <<>>}).
%%----------------------------------------------------------------------------
-export_type([key/0, deletions/0]).
@ -156,6 +160,14 @@ recover_semi_durable_route_txn(R = #route{binding = B}, X) ->
(Serial, false) -> x_callback(Serial, X, add_binding, B)
end).
exists(#binding{source = ?DEFAULT_EXCHANGE(_),
destination = #resource{kind = queue, name = QName} = Queue,
key = QName,
args = []}) ->
case rabbit_amqqueue:lookup(Queue) of
{ok, _} -> true;
{error, not_found} -> false
end;
exists(Binding) ->
binding_action(
Binding, fun (_Src, _Dst, B) ->
@ -243,9 +255,17 @@ list(VHostPath) ->
destination = VHostResource,
_ = '_'},
_ = '_'},
[B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
Route)].
%% if there are any default exchange bindings left after an upgrade
%% of a pre-3.8 database, filter them out
AllBindings = [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
Route)],
Filtered = lists:filter(fun(#binding{source = S}) ->
S =/= ?DEFAULT_EXCHANGE(VHostPath)
end, AllBindings),
implicit_bindings(VHostPath) ++ Filtered.
list_for_source(?DEFAULT_EXCHANGE(VHostPath)) ->
implicit_bindings(VHostPath);
list_for_source(SrcName) ->
mnesia:async_dirty(
fun() ->
@ -255,16 +275,43 @@ list_for_source(SrcName) ->
end).
list_for_destination(DstName) ->
mnesia:async_dirty(
fun() ->
Route = #route{binding = #binding{destination = DstName,
_ = '_'}},
[reverse_binding(B) ||
#reverse_route{reverse_binding = B} <-
mnesia:match_object(rabbit_reverse_route,
reverse_route(Route), read)]
end).
implicit_for_destination(DstName) ++
mnesia:async_dirty(
fun() ->
Route = #route{binding = #binding{destination = DstName,
_ = '_'}},
[reverse_binding(B) ||
#reverse_route{reverse_binding = B} <-
mnesia:match_object(rabbit_reverse_route,
reverse_route(Route), read)]
end).
implicit_bindings(VHostPath) ->
DstQueues = rabbit_amqqueue:list_names(VHostPath),
[ #binding{source = ?DEFAULT_EXCHANGE(VHostPath),
destination = DstQueue,
key = QName,
args = []}
|| DstQueue = #resource{name = QName} <- DstQueues ].
implicit_for_destination(DstQueue = #resource{kind = queue,
virtual_host = VHostPath,
name = QName}) ->
[#binding{source = ?DEFAULT_EXCHANGE(VHostPath),
destination = DstQueue,
key = QName,
args = []}];
implicit_for_destination(_) ->
[].
list_for_source_and_destination(?DEFAULT_EXCHANGE(VHostPath),
#resource{kind = queue,
virtual_host = VHostPath,
name = QName} = DstQueue) ->
[#binding{source = ?DEFAULT_EXCHANGE(VHostPath),
destination = DstQueue,
key = QName,
args = []}];
list_for_source_and_destination(SrcName, DstName) ->
mnesia:async_dirty(
fun() ->

View File

@ -72,7 +72,7 @@
-export([get_vhost/1, get_user/1]).
%% For testing
-export([build_topic_variable_map/3]).
-export([list_queue_states/1]).
-export([list_queue_states/1, get_max_message_size/0]).
%% Mgmt HTTP API refactor
-export([handle_method/5]).
@ -158,7 +158,9 @@
delivery_flow,
interceptor_state,
queue_states,
queue_cleanup_timer
queue_cleanup_timer,
%% Message content size limit
max_message_size
}).
-define(QUEUE, lqueue).
@ -441,6 +443,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
_ ->
Limiter0
end,
MaxMessageSize = get_max_message_size(),
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
@ -473,7 +476,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
reply_consumer = none,
delivery_flow = Flow,
interceptor_state = undefined,
queue_states = #{}},
queue_states = #{},
max_message_size = MaxMessageSize},
State1 = State#ch{
interceptor_state = rabbit_channel_interceptor:init(State)},
State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer),
@ -558,7 +562,6 @@ handle_cast({method, Method, Content, Flow},
flow -> credit_flow:ack(Reader);
noflow -> ok
end,
try handle_method(rabbit_channel_interceptor:intercept_in(
expand_shortcuts(Method, State), Content, IState),
State) of
@ -793,6 +796,16 @@ code_change(_OldVsn, State, _Extra) ->
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
-spec get_max_message_size() -> non_neg_integer().
get_max_message_size() ->
case application:get_env(rabbit, max_message_size) of
{ok, MS} when is_integer(MS) ->
erlang:min(MS, ?MAX_MSG_SIZE);
_ ->
?MAX_MSG_SIZE
end.
%%---------------------------------------------------------------------------
reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
@ -985,12 +998,19 @@ extract_topic_variable_map_from_amqp_params([{amqp_params, {amqp_params_direct,
extract_topic_variable_map_from_amqp_params(_) ->
#{}.
check_msg_size(Content) ->
check_msg_size(Content, MaxMessageSize) ->
Size = rabbit_basic:maybe_gc_large_msg(Content),
case Size > ?MAX_MSG_SIZE of
true -> precondition_failed("message size ~B larger than max size ~B",
[Size, ?MAX_MSG_SIZE]);
false -> ok
case Size of
S when S > MaxMessageSize ->
ErrorMessage = case MaxMessageSize of
?MAX_MSG_SIZE ->
"message size ~B is larger than max size ~B";
_ ->
"message size ~B is larger than configured max size ~B"
end,
precondition_failed(ErrorMessage,
[Size, MaxMessageSize]);
_ -> ok
end.
check_vhost_queue_limit(#resource{name = QueueName}, VHost) ->
@ -1164,16 +1184,17 @@ handle_method(#'basic.publish'{immediate = true}, _Content, _State) ->
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory},
Content, State = #ch{virtual_host = VHostPath,
tx = Tx,
channel = ChannelNum,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState,
user = #user{username = Username} = User,
conn_name = ConnName,
delivery_flow = Flow,
conn_pid = ConnPid}) ->
check_msg_size(Content),
Content, State = #ch{virtual_host = VHostPath,
tx = Tx,
channel = ChannelNum,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState,
user = #user{username = Username} = User,
conn_name = ConnName,
delivery_flow = Flow,
conn_pid = ConnPid,
max_message_size = MaxMessageSize}) ->
check_msg_size(Content, MaxMessageSize),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, User),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),

View File

@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_fifo).
@ -42,6 +42,7 @@
query_ra_indexes/1,
query_consumer_count/1,
query_consumers/1,
query_stat/1,
usage/1,
zero/1,
@ -57,8 +58,7 @@
make_discard/2,
make_credit/4,
make_purge/0,
make_update_config/1,
make_stat/0
make_update_config/1
]).
-type raw_msg() :: term().
@ -146,8 +146,7 @@
#discard{} |
#credit{} |
#purge{} |
#update_config{} |
#stat{}.
#update_config{}.
-type command() :: protocol() | ra_machine:builtin_command().
%% all the command types suppored by ra fifo
@ -181,6 +180,8 @@
suspected_down = false :: boolean()
}).
-type consumer() :: #consumer{}.
-record(enqueuer,
{next_seqno = 1 :: msg_seqno(),
% out of order enqueues - sorted list
@ -235,7 +236,12 @@
prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(),
PrefixMsgs :: non_neg_integer()},
msg_bytes_enqueue = 0 :: non_neg_integer(),
msg_bytes_checkout = 0 :: non_neg_integer()
msg_bytes_checkout = 0 :: non_neg_integer(),
%% whether single active consumer is on or not for this queue
consumer_strategy = default :: default | single_active,
%% waiting consumers, one is picked active consumer is cancelled or dies
%% used only when single active consumer is on
waiting_consumers = [] :: [{consumer_id(), consumer()}]
}).
-opaque state() :: #state{}.
@ -244,7 +250,8 @@
queue_resource := rabbit_types:r('queue'),
dead_letter_handler => applied_mfa(),
become_leader_handler => applied_mfa(),
shadow_copy_interval => non_neg_integer()}.
shadow_copy_interval => non_neg_integer(),
single_active_consumer_on => boolean()}.
-export_type([protocol/0,
delivery/0,
@ -271,9 +278,16 @@ update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
true ->
single_active;
false ->
default
end,
State#state{dead_letter_handler = DLH,
become_leader_handler = BLH,
shadow_copy_interval = SHI}.
shadow_copy_interval = SHI,
consumer_strategy = ConsumerStrategy}.
zero(_) ->
0.
@ -434,19 +448,6 @@ apply(#{index := RaftIdx}, #purge{},
%% reverse the effects ourselves
{State, {purge, Total},
lists:reverse([garbage_collection | Effects])};
apply(_, #stat{}, #state{name = Name,
messages = Messages,
ra_indexes = Indexes,
consumers = Cons,
msg_bytes_enqueue = EnqueueBytes,
msg_bytes_checkout = CheckoutBytes} = State) ->
Metrics = {maps:size(Messages), % Ready
num_checked_out(State), % checked out
rabbit_fifo_index:size(Indexes), %% Total
maps:size(Cons), % Consumers
EnqueueBytes,
CheckoutBytes},
{State, {stat, Metrics}, []};
apply(_, {down, ConsumerPid, noconnection},
#state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
@ -549,7 +550,8 @@ state_enter(leader, #state{consumers = Cons,
Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]),
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
Effects = Mons ++ Nots,
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
Effects = Mons ++ Nots ++ NodeMons,
case BLH of
undefined ->
Effects;
@ -663,6 +665,11 @@ query_consumers(#state{consumers = Consumers}) ->
maps:get(args, Meta, []),
maps:get(username, Meta, undefined)}
end, Consumers).
query_stat(#state{messages = M,
consumers = Consumers}) ->
{maps:size(M), maps:size(Consumers)}.
%% other
-spec usage(atom()) -> float().
@ -708,6 +715,42 @@ num_checked_out(#state{consumers = Cons}) ->
end, 0, maps:values(Cons)).
cancel_consumer(ConsumerId,
{Effects0, #state{consumer_strategy = default} = S0}) ->
%% general case, single active consumer off
cancel_consumer0(ConsumerId, {Effects0, S0});
cancel_consumer(ConsumerId,
{Effects0, #state{consumer_strategy = single_active,
waiting_consumers = [] } = S0}) ->
%% single active consumer on, no consumers are waiting
cancel_consumer0(ConsumerId, {Effects0, S0});
cancel_consumer(ConsumerId,
{Effects0, #state{consumers = Cons0,
consumer_strategy = single_active,
waiting_consumers = WaitingConsumers0 } = State0}) ->
%% single active consumer on, consumers are waiting
case maps:take(ConsumerId, Cons0) of
{_CurrentActiveConsumer = #consumer{checked_out = Checked0}, _} ->
% The active consumer is to be removed
% Cancel it
S = return_all(State0, Checked0),
Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
% Take another one from the waiting consumers and put it in consumers
[{NewActiveConsumerId, NewActiveConsumer} | RemainingWaitingConsumers] = WaitingConsumers0,
#state{service_queue = ServiceQueue} = State0,
ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, NewActiveConsumer, ServiceQueue),
State1 = State0#state{consumers = #{NewActiveConsumerId => NewActiveConsumer},
service_queue = ServiceQueue1,
waiting_consumers = RemainingWaitingConsumers},
{Effects, State1};
error ->
% The cancelled consumer is not the active one
% Just remove it from idle_consumers
{value, _Consumer, WaitingConsumers1} = lists:keytake(ConsumerId, 1, WaitingConsumers0),
% A waiting consumer isn't supposed to have any checked out messages, so nothing special to do here
{Effects0, State0#state{waiting_consumers = WaitingConsumers1}}
end.
cancel_consumer0(ConsumerId,
{Effects0, #state{consumers = C0} = S0}) ->
case maps:take(ConsumerId, C0) of
{#consumer{checked_out = Checked0}, Cons} ->
@ -718,9 +761,9 @@ cancel_consumer(ConsumerId,
{[{aux, inactive} | Effects], S#state{consumers = Cons}};
_ ->
{Effects, S#state{consumers = Cons}}
end;
end;
error ->
% already removed - do nothing
%% already removed: do nothing
{Effects0, S0}
end.
@ -1103,23 +1146,42 @@ uniq_queue_in(Key, Queue) ->
end.
update_consumer(ConsumerId, Meta, Spec,
#state{consumer_strategy = default} = State0) ->
%% general case, single active consumer off
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, Spec,
#state{consumers = Cons0,
consumer_strategy = single_active} = State0) when map_size(Cons0) == 0 ->
%% single active consumer on, no one is consuming yet
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
#state{consumers = Cons0,
service_queue = ServiceQueue0} = State0) ->
#state{consumer_strategy = single_active,
waiting_consumers = WaitingConsumers0} = State0) ->
%% single active consumer on and one active consumer already
%% adding the new consumer to the waiting list
Consumer = #consumer{lifetime = Life, meta = Meta,
credit = Credit, credit_mode = Mode},
WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}],
State0#state{waiting_consumers = WaitingConsumers1}.
update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
#state{consumers = Cons0,
service_queue = ServiceQueue0} = State0) ->
%% TODO: this logic may not be correct for updating a pre-existing consumer
Init = #consumer{lifetime = Life, meta = Meta,
credit = Credit, credit_mode = Mode},
Cons = maps:update_with(ConsumerId,
fun(S) ->
%% remove any in-flight messages from
%% the credit update
N = maps:size(S#consumer.checked_out),
C = max(0, Credit - N),
S#consumer{lifetime = Life,
credit = C}
end, Init, Cons0),
fun(S) ->
%% remove any in-flight messages from
%% the credit update
N = maps:size(S#consumer.checked_out),
C = max(0, Credit - N),
S#consumer{lifetime = Life,
credit = C}
end, Init, Cons0),
ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons),
ServiceQueue0),
ServiceQueue0),
State0#state{consumers = Cons, service_queue = ServiceQueue}.
@ -1197,9 +1259,6 @@ make_purge() -> #purge{}.
make_update_config(Config) ->
#update_config{config = Config}.
-spec make_stat() -> protocol().
make_stat() -> #stat{}.
add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) ->
Bytes = message_size(Msg),
State#state{msg_bytes_enqueue = Enqueue + Bytes}.
@ -1506,7 +1565,6 @@ cancelled_checkout_out_test() ->
{State3, {dequeue, {0, {_, first}}}, _} =
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2),
?debugFmt("State3 ~p", [State3]),
{_State, {dequeue, {_, {_, second}}}, _} =
apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), State3),
ok.
@ -1704,8 +1762,7 @@ return_prefix_msg_count_test() ->
],
Indexes = lists:seq(1, length(Commands)),
Entries = lists:zip(Indexes, Commands),
{State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries),
?debugFmt("return_prefix_msg_count_test state ~n~p~n", [State]),
{_State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries),
ok.
@ -1777,7 +1834,6 @@ run_snapshot_test(Name, Commands) ->
%% create every incremental permuation of the commands lists
%% and run the snapshot tests against that
[begin
?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]),
run_snapshot_test0(Name, C)
end || C <- prefixes(Commands, 1, [])].
@ -1790,11 +1846,8 @@ run_snapshot_test0(Name, Commands) ->
Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
(_) -> false
end, Entries),
?debugFmt("running from snapshot: ~b", [SnapIdx]),
{S, _} = run_log(SnapState, Filtered),
% assert log can be restored from any release cursor index
?debugFmt("Name ~p~nS~p~nState~p~nn",
[Name, S, State]),
?assertEqual(State, S)
end || {release_cursor, SnapIdx, SnapState} <- Effects],
ok.
@ -1917,6 +1970,71 @@ down_returns_checked_out_in_order_test() ->
?assertEqual(lists:sort(Returns), Returns),
ok.
single_active_consumer_test() ->
State0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue,
atom_to_binary(?FUNCTION_NAME, utf8)),
shadow_copy_interval => 0,
single_active_consumer_on => true}),
?assertEqual(single_active, State0#state.consumer_strategy),
?assertEqual(0, map_size(State0#state.consumers)),
% adding some consumers
AddConsumer = fun(CTag, State) ->
{NewState, _, _} = apply(
#{},
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, self()}},
State),
NewState
end,
State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
% the first registered consumer is the active one, the others are waiting
?assertEqual(1, map_size(State1#state.consumers)),
?assert(maps:is_key({<<"ctag1">>, self()}, State1#state.consumers)),
?assertEqual(3, length(State1#state.waiting_consumers)),
?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State1#state.waiting_consumers)),
?assertNotEqual(false, lists:keyfind({<<"ctag3">>, self()}, 1, State1#state.waiting_consumers)),
?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#state.waiting_consumers)),
% cancelling a waiting consumer
{State2, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1),
% the active consumer should still be in place
?assertEqual(1, map_size(State2#state.consumers)),
?assert(maps:is_key({<<"ctag1">>, self()}, State2#state.consumers)),
% the cancelled consumer has been removed from waiting consumers
?assertEqual(2, length(State2#state.waiting_consumers)),
?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State2#state.waiting_consumers)),
?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State2#state.waiting_consumers)),
% cancelling the active consumer
{State3, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2),
% the second registered consumer is now the active one
?assertEqual(1, map_size(State3#state.consumers)),
?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)),
% the new active consumer is no longer in the waiting list
?assertEqual(1, length(State3#state.waiting_consumers)),
?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#state.waiting_consumers)),
% cancelling the active consumer
{State4, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3),
% the last waiting consumer became the active one
?assertEqual(1, map_size(State4#state.consumers)),
?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)),
% the waiting consumer list is now empty
?assertEqual(0, length(State4#state.waiting_consumers)),
% cancelling the last consumer
{State5, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4),
% no active consumer anymore
?assertEqual(0, map_size(State5#state.consumers)),
% still nothing in the waiting list
?assertEqual(0, length(State5#state.waiting_consumers)),
ok.
meta(Idx) ->
#{index => Idx, term => 1}.

View File

@ -399,22 +399,12 @@ purge(Node) ->
Err
end.
-spec stat(ra_server_id()) -> {ok, {non_neg_integer(), non_neg_integer(),
non_neg_integer(), non_neg_integer(),
non_neg_integer(), non_neg_integer()}}
-spec stat(ra_server_id()) -> {ok, {non_neg_integer(), non_neg_integer()}}
| {error | timeout, term()}.
stat(Servers) ->
try_process_stat(Servers, rabbit_fifo:make_stat()).
try_process_stat([Server | Rem], Cmd) ->
case ra:process_command(Server, Cmd, 30000) of
{ok, {stat, Reply}, _} ->
{ok, Reply};
Err when length(Rem) =:= 0 ->
Err;
_ ->
try_process_stat(Rem, Cmd)
end.
stat(Leader) ->
Query = fun (State) -> rabbit_fifo:query_stat(State) end,
{ok, {_, Stat}, _} = ra:local_query(Leader, Query),
Stat.
%% @doc returns the cluster name
-spec cluster_name(state()) -> cluster_name().

View File

@ -11,17 +11,18 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_queue_consumers).
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
unacknowledged_message_count/0, add/10, remove/3, erase_ch/2,
send_drained/0, deliver/3, record_ack/3, subtract_acks/3,
send_drained/0, deliver/5, record_ack/3, subtract_acks/3,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
credit/6, utilisation/1]).
credit/6, utilisation/1, is_same/3, get_consumer/1, get/3,
consumer_tag/1]).
%%----------------------------------------------------------------------------
@ -42,7 +43,7 @@
acktags,
consumer_count,
%% Queue of {ChPid, #consumer{}} for consumers which have
%% been blocked for any reason
%% been blocked (rate/prefetch limited) for any reason
blocked_consumers,
%% The limiter itself
limiter,
@ -57,6 +58,9 @@
use :: {'inactive',
time_micros(), time_micros(), ratio()} |
{'active', time_micros(), ratio()}}.
-type consumer() :: #consumer{tag::rabbit_types:ctag(), ack_required::boolean(),
prefetch::non_neg_integer(), args::rabbit_framing:amqp_table(),
user::rabbit_types:username()}.
-type ch() :: pid().
-type ack() :: non_neg_integer().
-type cr_fun() :: fun ((#cr{}) -> #cr{}).
@ -81,7 +85,8 @@
state()}.
-spec send_drained() -> 'ok'.
-spec deliver(fun ((boolean()) -> {fetch_result(), T}),
rabbit_amqqueue:name(), state()) ->
rabbit_amqqueue:name(), state(), boolean(),
none | {ch(), rabbit_types:ctag()} | {ch(), consumer()}) ->
{'delivered', boolean(), T, state()} |
{'undelivered', boolean(), state()}.
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
@ -95,6 +100,7 @@
-spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(),
state()) -> 'unchanged' | {'unblocked', state()}.
-spec utilisation(state()) -> ratio().
-spec consumer_tag(consumer()) -> rabbit_types:ctag().
%%----------------------------------------------------------------------------
@ -189,10 +195,34 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
ok.
deliver(FetchFun, QName, State) -> deliver(FetchFun, QName, false, State).
deliver(FetchFun, QName, State, SingleActiveConsumerIsOn, ActiveConsumer) ->
deliver(FetchFun, QName, false, State, SingleActiveConsumerIsOn, ActiveConsumer).
deliver(_FetchFun, _QName, false, State, true, none) ->
{undelivered, false,
State#state{use = update_use(State#state.use, inactive)}};
deliver(FetchFun, QName, false, State = #state{consumers = Consumers}, true, SingleActiveConsumer) ->
{ChPid, Consumer} = SingleActiveConsumer,
%% blocked (rate/prefetch limited) consumers are removed from the queue state, but not the exclusive_consumer field,
%% so we need to do this check to avoid adding the exclusive consumer to the channel record
%% over and over
case is_blocked(SingleActiveConsumer) of
true ->
{undelivered, false,
State#state{use = update_use(State#state.use, inactive)}};
false ->
case deliver_to_consumer(FetchFun, SingleActiveConsumer, QName) of
{delivered, R} ->
{delivered, false, R, State};
undelivered ->
{ChPid, Consumer} = SingleActiveConsumer,
Consumers1 = remove_consumer(ChPid, Consumer#consumer.tag, Consumers),
{undelivered, true,
State#state{consumers = Consumers1, use = update_use(State#state.use, inactive)}}
end
end;
deliver(FetchFun, QName, ConsumersChanged,
State = #state{consumers = Consumers}) ->
State = #state{consumers = Consumers}, false, _SingleActiveConsumer) ->
case priority_queue:out_p(Consumers) of
{empty, _} ->
{undelivered, ConsumersChanged,
@ -205,7 +235,7 @@ deliver(FetchFun, QName, ConsumersChanged,
Tail)}};
undelivered ->
deliver(FetchFun, QName, true,
State#state{consumers = Tail})
State#state{consumers = Tail}, false, _SingleActiveConsumer)
end
end.
@ -246,6 +276,10 @@ deliver_to_consumer(FetchFun,
unsent_message_count = Count + 1}),
R.
is_blocked(Consumer = {ChPid, _C}) ->
#cr{blocked_consumers = BlockedConsumers} = lookup_ch(ChPid),
priority_queue:member(Consumer, BlockedConsumers).
record_ack(ChPid, LimiterPid, AckTag) ->
C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),
update_ch_record(C#cr{acktags = ?QUEUE:in({AckTag, none}, ChAckTags)}),
@ -357,6 +391,29 @@ utilisation(#state{use = {active, Since, Avg}}) ->
utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
use_avg(Active, erlang:monotonic_time(micro_seconds) - Since, Avg).
is_same(ChPid, ConsumerTag, {ChPid, #consumer{tag = ConsumerTag}}) ->
true;
is_same(_ChPid, _ConsumerTag, _Consumer) ->
false.
get_consumer(#state{consumers = Consumers}) ->
case priority_queue:out_p(Consumers) of
{{value, Consumer, _Priority}, _Tail} -> Consumer;
{empty, _} -> undefined
end.
get(ChPid, ConsumerTag, #state{consumers = Consumers}) ->
Consumers1 = priority_queue:filter(fun ({CP, #consumer{tag = CT}}) ->
(CP == ChPid) and (CT == ConsumerTag)
end, Consumers),
case priority_queue:out_p(Consumers1) of
{empty, _} -> undefined;
{{value, Consumer, _Priority}, _Tail} -> Consumer
end.
consumer_tag(#consumer{tag = CTag}) ->
CTag.
%%----------------------------------------------------------------------------
parse_credit_args(Default, Args) ->

View File

@ -38,45 +38,33 @@ description() ->
<<"Locate queue master node from cluster node with least bound queues">>}].
queue_master_location(#amqqueue{} = Q) ->
Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
VHosts = rabbit_vhost:list(),
BoundQueueMasters = get_bound_queue_masters_per_vhost(VHosts, []),
{_Count, MinMaster}= get_min_master(Cluster, BoundQueueMasters),
{ok, MinMaster}.
Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
QueueNames = rabbit_amqqueue:list_names(),
MastersPerNode = lists:foldl(
fun(#resource{virtual_host = VHost, name = QueueName}, NodeMasters) ->
case rabbit_queue_master_location_misc:lookup_master(QueueName, VHost) of
{ok, Master} when is_atom(Master) ->
case maps:is_key(Master, NodeMasters) of
true -> maps:update_with(Master,
fun(N) -> N + 1 end,
NodeMasters);
false -> NodeMasters
end;
_ -> NodeMasters
end
end,
maps:from_list([{N, 0} || N <- Cluster]),
QueueNames),
%%---------------------------------------------------------------------------
%% Private helper functions
%%---------------------------------------------------------------------------
get_min_master(Cluster, BoundQueueMasters) ->
lists:min([ {count_masters(Node, BoundQueueMasters), Node} ||
Node <- Cluster ]).
count_masters(Node, Masters) ->
length([ X || X <- Masters, X == Node ]).
get_bound_queue_masters_per_vhost([], Acc) ->
lists:flatten(Acc);
get_bound_queue_masters_per_vhost([VHost|RemVHosts], Acc) ->
BoundQueueNames =
lists:filtermap(
fun(#binding{destination =#resource{kind = queue,
name = QueueName}}) ->
{true, QueueName};
(_) ->
false
end,
rabbit_binding:list(VHost)),
UniqQueueNames = lists:usort(BoundQueueNames),
BoundQueueMasters = get_queue_masters(VHost, UniqQueueNames, []),
get_bound_queue_masters_per_vhost(RemVHosts, [BoundQueueMasters|Acc]).
get_queue_masters(_VHost, [], BoundQueueNodes) -> BoundQueueNodes;
get_queue_masters(VHost, [QueueName | RemQueueNames], QueueMastersAcc) ->
QueueMastersAcc0 = case rabbit_queue_master_location_misc:lookup_master(
QueueName, VHost) of
{ok, Master} when is_atom(Master) ->
[Master|QueueMastersAcc];
_ -> QueueMastersAcc
end,
get_queue_masters(VHost, RemQueueNames, QueueMastersAcc0).
{MinNode, _NMasters} = maps:fold(
fun(Node, NMasters, init) ->
{Node, NMasters};
(Node, NMasters, {MinNode, MinMasters}) ->
case NMasters < MinMasters of
true -> {Node, NMasters};
false -> {MinNode, MinMasters}
end
end,
init,
MastersPerNode),
{ok, MinNode}.

View File

@ -37,7 +37,8 @@
-export([policy_changed/2]).
-export([cleanup_data_dir/0]).
-include_lib("rabbit_common/include/rabbit.hrl").
%%-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
-type ra_server_id() :: {Name :: atom(), Node :: node()}.
@ -150,7 +151,14 @@ ra_machine_config(Q = #amqqueue{name = QName,
#{name => Name,
queue_resource => QName,
dead_letter_handler => dlx_mfa(Q),
become_leader_handler => {?MODULE, become_leader, [QName]}}.
become_leader_handler => {?MODULE, become_leader, [QName]},
single_active_consumer_on => single_active_consumer_on(Q)}.
single_active_consumer_on(#amqqueue{arguments = QArguments}) ->
case rabbit_misc:table_lookup(QArguments, <<"x-single-active-consumer">>) of
{bool, true} -> true;
_ -> false
end.
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
Node = node(ChPid),
@ -424,11 +432,12 @@ infos(QName) ->
info(Q, Items) ->
[{Item, i(Item, Q)} || Item <- Items].
stat(#amqqueue{pid = {Name, _}, quorum_nodes = Nodes}) ->
case rabbit_fifo_client:stat([{Name, N} || N <- Nodes]) of
{ok, {Ready, _, _, Consumers, _, _}} ->
{ok, Ready, Consumers};
_ ->
stat(#amqqueue{pid = Leader}) ->
try
{Ready, Consumers} = rabbit_fifo_client:stat(Leader),
{ok, Ready, Consumers}
catch
_:_ ->
%% Leader is not available, cluster might be in minority
{ok, 0, 0}
end.

View File

@ -0,0 +1,230 @@
%% Copyright (c) 2011 Basho Technologies, Inc. All Rights Reserved.
%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%% @doc A custom event handler to the `sysmon_handler' application's
%% `system_monitor' event manager.
%%
%% This module attempts to discover more information about a process
%% that generates a system_monitor event.
-module(rabbit_sysmon_handler).
-behaviour(gen_event).
%% API
-export([add_handler/0]).
%% gen_event callbacks
-export([init/1, handle_event/2, handle_call/2,
handle_info/2, terminate/2, code_change/3]).
-record(state, {timer_ref :: reference()}).
-define(INACTIVITY_TIMEOUT, 5000).
%%%===================================================================
%%% gen_event callbacks
%%%===================================================================
add_handler() ->
%% Vulnerable to race conditions (installing handler multiple
%% times), but risk is zero in the common OTP app startup case.
case lists:member(?MODULE, gen_event:which_handlers(sysmon_handler)) of
true ->
ok;
false ->
sysmon_handler_filter:add_custom_handler(?MODULE, [])
end.
%%%===================================================================
%%% gen_event callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever a new event handler is added to an event manager,
%% this function is called to initialize the event handler.
%%
%% @spec init(Args) -> {ok, State}
%% @end
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{}, hibernate}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event manager receives an event sent using
%% gen_event:notify/2 or gen_event:sync_notify/2, this function is
%% called for each installed event handler to handle the event.
%%
%% @spec handle_event(Event, State) ->
%% {ok, State} |
%% {swap_handler, Args1, State1, Mod2, Args2} |
%% remove_handler
%% @end
%%--------------------------------------------------------------------
handle_event({monitor, Pid, Type, _Info},
State=#state{timer_ref=TimerRef}) when Pid == self() ->
%% Reset the inactivity timeout
NewTimerRef = reset_timer(TimerRef),
maybe_collect_garbage(Type),
{ok, State#state{timer_ref=NewTimerRef}};
handle_event({monitor, PidOrPort, Type, Info}, State=#state{timer_ref=TimerRef}) ->
%% Reset the inactivity timeout
NewTimerRef = reset_timer(TimerRef),
{Fmt, Args} = format_pretty_proc_or_port_info(PidOrPort),
rabbit_log:warning("~p ~w ~w " ++ Fmt ++ " ~w", [?MODULE, Type, PidOrPort] ++ Args ++ [Info]),
{ok, State#state{timer_ref=NewTimerRef}};
handle_event(Event, State=#state{timer_ref=TimerRef}) ->
NewTimerRef = reset_timer(TimerRef),
rabbit_log:warning("~p unhandled event: ~p", [?MODULE, Event]),
{ok, State#state{timer_ref=NewTimerRef}}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event manager receives a request sent using
%% gen_event:call/3,4, this function is called for the specified
%% event handler to handle the request.
%%
%% @spec handle_call(Request, State) ->
%% {ok, Reply, State} |
%% {swap_handler, Reply, Args1, State1, Mod2, Args2} |
%% {remove_handler, Reply}
%% @end
%%--------------------------------------------------------------------
handle_call(_Call, State) ->
Reply = not_supported,
{ok, Reply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called for each installed event handler when
%% an event manager receives any other message than an event or a
%% synchronous request (or a system message).
%%
%% @spec handle_info(Info, State) ->
%% {ok, State} |
%% {swap_handler, Args1, State1, Mod2, Args2} |
%% remove_handler
%% @end
%%--------------------------------------------------------------------
handle_info(inactivity_timeout, State) ->
%% No events have arrived for the timeout period
%% so hibernate to free up resources.
{ok, State, hibernate};
handle_info(Info, State) ->
rabbit_log:info("handle_info got ~p", [Info]),
{ok, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event handler is deleted from an event manager, this
%% function is called. It should be the opposite of Module:init/1 and
%% do any necessary cleaning up.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
format_pretty_proc_or_port_info(PidOrPort) ->
try
case get_pretty_proc_or_port_info(PidOrPort) of
undefined ->
{"", []};
Res ->
Res
end
catch C:E:S ->
{"Pid ~w, ~W ~W at ~w\n",
[PidOrPort, C, 20, E, 20, S]}
end.
get_pretty_proc_or_port_info(Pid) when is_pid(Pid) ->
Infos = [registered_name, initial_call, current_function, message_queue_len],
case process_info(Pid, Infos) of
undefined ->
undefined;
[] ->
undefined;
[{registered_name, RN0}, ICT1, {_, CF}, {_, MQL}] ->
ICT = case proc_lib:translate_initial_call(Pid) of
{proc_lib, init_p, 5} -> % not by proc_lib, see docs
ICT1;
ICT2 ->
{initial_call, ICT2}
end,
RNL = if RN0 == [] -> [];
true -> [{name, RN0}]
end,
{"~w", [RNL ++ [ICT, CF, {message_queue_len, MQL}]]}
end;
get_pretty_proc_or_port_info(Port) when is_port(Port) ->
PortInfo = erlang:port_info(Port),
{value, {name, Name}, PortInfo2} = lists:keytake(name, 1, PortInfo),
QueueSize = [erlang:port_info(Port, queue_size)],
Connected = case proplists:get_value(connected, PortInfo2) of
undefined ->
[];
ConnectedPid ->
case proc_lib:translate_initial_call(ConnectedPid) of
{proc_lib, init_p, 5} -> % not by proc_lib, see docs
[];
ICT ->
[{initial_call, ICT}]
end
end,
{"name ~s ~w", [Name, lists:append([PortInfo2, QueueSize, Connected])]}.
%% @doc If the message type is due to a large heap warning
%% and the source is ourself, go ahead and collect garbage
%% to avoid the death spiral.
-spec maybe_collect_garbage(atom()) -> ok.
maybe_collect_garbage(large_heap) ->
erlang:garbage_collect(),
ok;
maybe_collect_garbage(_) ->
ok.
-spec reset_timer(undefined | reference()) -> reference().
reset_timer(undefined) ->
erlang:send_after(?INACTIVITY_TIMEOUT, self(), inactivity_timeout);
reset_timer(TimerRef) ->
_ = erlang:cancel_timer(TimerRef),
reset_timer(undefined).

View File

@ -0,0 +1,156 @@
%% -------------------------------------------------------------------
%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(rabbit_sysmon_minder).
-behaviour(gen_server).
%% API
-export([start_link/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {}).
%%%===================================================================
%%% API
%%%===================================================================
%%--------------------------------------------------------------------
%% @doc
%% Starts the server
%%
%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
%% @end
%%--------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Initializes the server
%%
%% @spec init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% @end
%%--------------------------------------------------------------------
init([]) ->
%% Add our system_monitor event handler. We do that here because
%% we have a process at our disposal (i.e. ourself) to receive the
%% notification in the very unlikely event that the
%% sysmon_handler has crashed and been removed from the
%% sysmon_handler gen_event server. (If we had a supervisor
%% or app-starting process add the handler, then if the handler
%% crashes, nobody will act on the crash notification.)
rabbit_sysmon_handler:add_handler(),
{ok, #state{}}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling call messages
%%
%% @spec handle_call(Request, From, State) ->
%% {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling cast messages
%%
%% @spec handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_cast(_Msg, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling all non call/cast messages
%%
%% @spec handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_info({gen_event_EXIT, rabbit_sysmon_handler, _}, State) ->
%% SASL will create an error message, no need for us to duplicate it.
%%
%% Our handler should never crash, but it did indeed crash. If
%% there's a pathological condition somewhere that's generating
%% lots of unforseen things that crash core's custom handler, we
%% could make things worse by jumping back into the exploding
%% volcano. Wait a little bit before jumping back. Besides, the
%% system_monitor data is nice but is not critical: there is no
%% need to make things worse if things are indeed bad, and if we
%% miss a few seconds of system_monitor events, the world will not
%% end.
timer:sleep(2*1000),
rabbit_sysmon_handler:add_handler(),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

File diff suppressed because it is too large Load Diff

View File

@ -107,7 +107,7 @@ cluster_full_partition_with_autoheal(Config) ->
Conn4 = open_unmanaged_connection(Config, B),
Conn5 = open_unmanaged_connection(Config, C),
Conn6 = open_unmanaged_connection(Config, C),
?assertEqual(6, count_connections_in(Config, VHost)),
wait_for_count_connections_in(Config, VHost, 6, 60000),
%% B drops off the network, non-reachable by either A or C
rabbit_ct_broker_helpers:block_traffic_between(A, B),
@ -115,14 +115,14 @@ cluster_full_partition_with_autoheal(Config) ->
timer:sleep(?DELAY),
%% A and C are still connected, so 4 connections are tracked
?assertEqual(4, count_connections_in(Config, VHost)),
wait_for_count_connections_in(Config, VHost, 4, 60000),
rabbit_ct_broker_helpers:allow_traffic_between(A, B),
rabbit_ct_broker_helpers:allow_traffic_between(B, C),
timer:sleep(?DELAY),
%% during autoheal B's connections were dropped
?assertEqual(4, count_connections_in(Config, VHost)),
wait_for_count_connections_in(Config, VHost, 4, 60000),
lists:foreach(fun (Conn) ->
(catch rabbit_ct_client_helpers:close_connection(Conn))
@ -131,11 +131,22 @@ cluster_full_partition_with_autoheal(Config) ->
passed.
%% -------------------------------------------------------------------
%% Helpers
%% -------------------------------------------------------------------
wait_for_count_connections_in(Config, VHost, Expected, Time) when Time =< 0 ->
?assertEqual(Expected, count_connections_in(Config, VHost));
wait_for_count_connections_in(Config, VHost, Expected, Time) ->
case count_connections_in(Config, VHost) of
Expected ->
ok;
_ ->
Sleep = 3000,
timer:sleep(Sleep),
wait_for_count_connections_in(Config, VHost, Expected, Time - Sleep)
end.
count_connections_in(Config, VHost) ->
count_connections_in(Config, VHost, 0).
count_connections_in(Config, VHost, NodeIndex) ->

View File

@ -345,7 +345,7 @@ start_queue(Config) ->
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Test declare an existing queue
?assertEqual({'queue.declare_ok', LQ, 0, 0},
@ -361,7 +361,7 @@ start_queue(Config) ->
%% Check that the application and process are still up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])).
?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])).
start_queue_concurrent(Config) ->
Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@ -422,13 +422,13 @@ stop_queue(Config) ->
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Delete the quorum queue
?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = LQ})),
%% Check that the application and process are down
wait_until(fun() ->
[] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
[] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
end),
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))).
@ -447,7 +447,7 @@ restart_queue(Config) ->
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])).
?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])).
idempotent_recover(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
@ -526,7 +526,7 @@ restart_all_types(Config) ->
%% Check that the application and two ra nodes are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Check the classic queues restarted correctly
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
{#'basic.get_ok'{}, #amqp_msg{}} =
@ -568,7 +568,7 @@ stop_start_rabbit_app(Config) ->
%% Check that the application and two ra nodes are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Check the classic queues restarted correctly
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
{#'basic.get_ok'{}, #amqp_msg{}} =
@ -1265,7 +1265,7 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
wait_until(fun() ->
[] == rpc:call(Server, supervisor, which_children,
[ra_server_sup])
[ra_server_sup_sup])
end),
%% Check that all queue states have been cleaned
wait_for_cleanup(Server, NCh1, 0),
@ -1302,7 +1302,7 @@ cleanup_queue_state_on_channel_after_subscribe(Config) ->
wait_for_cleanup(Server, NCh2, 1),
?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
wait_until(fun() ->
[] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
[] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
end),
%% Check that all queue states have been cleaned
wait_for_cleanup(Server, NCh1, 0),
@ -1966,7 +1966,7 @@ delete_immediately_by_resource(Config) ->
%% Check that the application and process are down
wait_until(fun() ->
[] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
[] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
end),
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))).
@ -2237,7 +2237,8 @@ wait_for_cleanup(Server, Channel, Number) ->
wait_for_cleanup(Server, Channel, Number, 60).
wait_for_cleanup(Server, Channel, Number, 0) ->
?assertEqual(Number, length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])));
?assertEqual(length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])),
Number);
wait_for_cleanup(Server, Channel, Number, N) ->
case length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])) of
Length when Number == Length ->

View File

@ -55,7 +55,7 @@ init_per_testcase(TestCase, Config) ->
meck:expect(rabbit_quorum_queue, update_metrics, fun (_, _) -> ok end),
meck:expect(rabbit_quorum_queue, cancel_consumer_handler,
fun (_, _) -> ok end),
ra_server_sup:remove_all(),
ra_server_sup_sup:remove_all(),
ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"),
ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"),
ClusterName = rabbit_misc:r("/", queue, atom_to_binary(TestCase, utf8)),

View File

@ -259,7 +259,8 @@ handle_op({input_event, requeue}, #t{effects = Effs} = T) ->
_ ->
T
end;
handle_op({input_event, Settlement}, #t{effects = Effs} = T) ->
handle_op({input_event, Settlement}, #t{effects = Effs,
down = Down} = T) ->
case queue:out(Effs) of
{{value, {settle, MsgIds, CId}}, Q} ->
Cmd = case Settlement of
@ -269,7 +270,14 @@ handle_op({input_event, Settlement}, #t{effects = Effs} = T) ->
end,
do_apply(Cmd, T#t{effects = Q});
{{value, Cmd}, Q} when element(1, Cmd) =:= enqueue ->
do_apply(Cmd, T#t{effects = Q});
case maps:is_key(element(2, Cmd), Down) of
true ->
%% enqueues cannot arrive after down for the same process
%% drop message
T#t{effects = Q};
false ->
do_apply(Cmd, T#t{effects = Q})
end;
_ ->
T
end;

View File

@ -0,0 +1,286 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
%%
-module(single_active_consumer_SUITE).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-compile(export_all).
all() ->
[
{group, classic_queue}, {group, quorum_queue}
].
groups() ->
[
{classic_queue, [], [
all_messages_go_to_one_consumer,
fallback_to_another_consumer_when_first_one_is_cancelled,
fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled,
amqp_exclusive_consume_fails_on_exclusive_consumer_queue
]},
{quorum_queue, [], [
all_messages_go_to_one_consumer,
fallback_to_another_consumer_when_first_one_is_cancelled,
fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled
%% amqp_exclusive_consume_fails_on_exclusive_consumer_queue % Exclusive consume not implemented in QQ
]}
].
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, ?MODULE}
]),
rabbit_ct_helpers:run_setup_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_group(classic_queue, Config) ->
[{single_active_consumer_queue_declare,
#'queue.declare'{arguments = [
{<<"x-single-active-consumer">>, bool, true},
{<<"x-queue-type">>, longstr, <<"classic">>}
],
auto_delete = true}
} | Config];
init_per_group(quorum_queue, Config) ->
[{single_active_consumer_queue_declare,
#'queue.declare'{arguments = [
{<<"x-single-active-consumer">>, bool, true},
{<<"x-queue-type">>, longstr, <<"quorum">>}
],
durable = true, exclusive = false, auto_delete = false}
} | Config].
end_per_group(_, Config) ->
Config.
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
all_messages_go_to_one_consumer(Config) ->
{C, Ch} = connection_and_channel(Config),
Q = queue_declare(Ch, Config),
MessageCount = 5,
ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount}]),
#'basic.consume_ok'{consumer_tag = CTag1} =
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
#'basic.consume_ok'{consumer_tag = CTag2} =
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
[amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount)],
receive
{consumer_done, {MessagesPerConsumer, MessageCount}} ->
?assertEqual(MessageCount, MessageCount),
?assertEqual(2, maps:size(MessagesPerConsumer)),
?assertEqual(MessageCount, maps:get(CTag1, MessagesPerConsumer)),
?assertEqual(0, maps:get(CTag2, MessagesPerConsumer))
after 1000 ->
throw(failed)
end,
amqp_connection:close(C),
ok.
fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
{C, Ch} = connection_and_channel(Config),
Q = queue_declare(Ch, Config),
MessageCount = 10,
ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount}]),
#'basic.consume_ok'{consumer_tag = CTag1} =
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
#'basic.consume_ok'{consumer_tag = CTag2} =
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
#'basic.consume_ok'{consumer_tag = CTag3} =
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
[amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)],
{MessagesPerConsumer1, _} = wait_for_messages(MessageCount div 2),
FirstActiveConsumerInList = maps:keys(maps:filter(fun(_CTag, Count) -> Count > 0 end, MessagesPerConsumer1)),
?assertEqual(1, length(FirstActiveConsumerInList)),
FirstActiveConsumer = lists:nth(1, FirstActiveConsumerInList),
#'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = FirstActiveConsumer}),
{cancel_ok, FirstActiveConsumer} = wait_for_cancel_ok(),
[amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)],
{MessagesPerConsumer2, _} = wait_for_messages(MessageCount div 2 - 1),
SecondActiveConsumerInList = maps:keys(maps:filter(
fun(CTag, Count) -> Count > 0 andalso CTag /= FirstActiveConsumer end,
MessagesPerConsumer2)
),
?assertEqual(1, length(SecondActiveConsumerInList)),
SecondActiveConsumer = lists:nth(1, SecondActiveConsumerInList),
#'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = SecondActiveConsumer}),
amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}),
wait_for_messages(1),
LastActiveConsumer = lists:nth(1, lists:delete(FirstActiveConsumer, lists:delete(SecondActiveConsumer, [CTag1, CTag2, CTag3]))),
receive
{consumer_done, {MessagesPerConsumer, MessageCount}} ->
?assertEqual(MessageCount, MessageCount),
?assertEqual(3, maps:size(MessagesPerConsumer)),
?assertEqual(MessageCount div 2, maps:get(FirstActiveConsumer, MessagesPerConsumer)),
?assertEqual(MessageCount div 2 - 1, maps:get(SecondActiveConsumer, MessagesPerConsumer)),
?assertEqual(1, maps:get(LastActiveConsumer, MessagesPerConsumer))
after 1000 ->
throw(failed)
end,
amqp_connection:close(C),
ok.
fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config) ->
{C, Ch} = connection_and_channel(Config),
{C1, Ch1} = connection_and_channel(Config),
{C2, Ch2} = connection_and_channel(Config),
{C3, Ch3} = connection_and_channel(Config),
Q = queue_declare(Ch, Config),
MessageCount = 10,
Consumer1Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2}]),
Consumer2Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2 - 1}]),
Consumer3Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2 - 1}]),
#'basic.consume_ok'{consumer_tag = CTag1} =
amqp_channel:subscribe(Ch1, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"1">>}, Consumer1Pid),
#'basic.consume_ok'{} =
amqp_channel:subscribe(Ch2, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"2">>}, Consumer2Pid),
#'basic.consume_ok'{} =
amqp_channel:subscribe(Ch3, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"3">>}, Consumer3Pid),
Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
[amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)],
{MessagesPerConsumer1, MessageCount1} = consume_results(),
?assertEqual(MessageCount div 2, MessageCount1),
?assertEqual(1, maps:size(MessagesPerConsumer1)),
?assertEqual(MessageCount div 2, maps:get(CTag1, MessagesPerConsumer1)),
ok = amqp_channel:close(Ch1),
[amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)],
{MessagesPerConsumer2, MessageCount2} = consume_results(),
?assertEqual(MessageCount div 2 - 1, MessageCount2),
?assertEqual(1, maps:size(MessagesPerConsumer2)),
ok = amqp_channel:close(Ch2),
amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"poison">>}),
{MessagesPerConsumer3, MessageCount3} = consume_results(),
?assertEqual(1, MessageCount3),
?assertEqual(1, maps:size(MessagesPerConsumer3)),
[amqp_connection:close(Conn) || Conn <- [C1, C2, C3, C]],
ok.
amqp_exclusive_consume_fails_on_exclusive_consumer_queue(Config) ->
{C, Ch} = connection_and_channel(Config),
Q = queue_declare(Ch, Config),
?assertExit(
{{shutdown, {server_initiated_close, 403, _}}, _},
amqp_channel:call(Ch, #'basic.consume'{queue = Q, exclusive = true})
),
amqp_connection:close(C),
ok.
connection_and_channel(Config) ->
C = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
{ok, Ch} = amqp_connection:open_channel(C),
{C, Ch}.
queue_declare(Channel, Config) ->
Declare = ?config(single_active_consumer_queue_declare, Config),
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, Declare),
Q.
consume({Parent, State, 0}) ->
Parent ! {consumer_done, State};
consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}) ->
receive
#'basic.consume_ok'{consumer_tag = CTag} ->
consume({Parent, {maps:put(CTag, 0, MessagesPerConsumer), MessageCount}, CountDown});
{#'basic.deliver'{consumer_tag = CTag}, #amqp_msg{payload = <<"poison">>}} ->
Parent ! {consumer_done,
{maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer),
MessageCount + 1}};
{#'basic.deliver'{consumer_tag = CTag}, _Content} ->
NewState = {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer),
MessageCount + 1},
Parent ! {message, NewState},
consume({Parent, NewState, CountDown - 1});
#'basic.cancel_ok'{consumer_tag = CTag} ->
Parent ! {cancel_ok, CTag},
consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown});
_ ->
consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown})
after 10000 ->
Parent ! {consumer_timeout, {MessagesPerConsumer, MessageCount}},
exit(consumer_timeout)
end.
consume_results() ->
receive
{consumer_done, {MessagesPerConsumer, MessageCount}} ->
{MessagesPerConsumer, MessageCount};
{consumer_timeout, {MessagesPerConsumer, MessageCount}} ->
{MessagesPerConsumer, MessageCount};
_ ->
consume_results()
after 1000 ->
throw(failed)
end.
wait_for_messages(ExpectedCount) ->
wait_for_messages(ExpectedCount, {}).
wait_for_messages(0, State) ->
State;
wait_for_messages(ExpectedCount, _) ->
receive
{message, {MessagesPerConsumer, MessageCount}} ->
wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount})
after 5000 ->
throw(message_waiting_timeout)
end.
wait_for_cancel_ok() ->
receive
{cancel_ok, CTag} ->
{cancel_ok, CTag}
after 5000 ->
throw(consumer_cancel_ok_timeout)
end.

View File

@ -25,6 +25,7 @@
-define(TIMEOUT_LIST_OPS_PASS, 5000).
-define(TIMEOUT, 30000).
-define(TIMEOUT_CHANNEL_EXCEPTION, 5000).
-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
@ -60,10 +61,16 @@ groups() ->
topic_matching,
{queue_max_length, [], [
{max_length_simple, [], MaxLengthTests},
{max_length_mirrored, [], MaxLengthTests}]}
{max_length_mirrored, [], MaxLengthTests}]},
max_message_size
]}
].
suite() ->
[
{timetrap, {minutes, 3}}
].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
@ -1299,6 +1306,74 @@ sync_mirrors(QName, Config) ->
_ -> ok
end.
gen_binary_mb(N) ->
B1M = << <<"_">> || _ <- lists:seq(1, 1024 * 1024) >>,
<< B1M || _ <- lists:seq(1, N) >>.
assert_channel_alive(Ch) ->
amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>},
#amqp_msg{payload = <<"HI">>}).
assert_channel_fail_max_size(Ch, Monitor) ->
receive
{'DOWN', Monitor, process, Ch,
{shutdown,
{server_initiated_close, 406, _Error}}} ->
ok
after ?TIMEOUT_CHANNEL_EXCEPTION ->
error({channel_exception_expected, max_message_size})
end.
max_message_size(Config) ->
Binary2M = gen_binary_mb(2),
Binary4M = gen_binary_mb(4),
Binary6M = gen_binary_mb(6),
Binary10M = gen_binary_mb(10),
Size2Mb = 1024 * 1024 * 2,
Size2Mb = byte_size(Binary2M),
rabbit_ct_broker_helpers:rpc(Config, 0,
application, set_env, [rabbit, max_message_size, 1024 * 1024 * 3]),
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
%% Binary is whithin the max size limit
amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary2M}),
%% The channel process is alive
assert_channel_alive(Ch),
Monitor = monitor(process, Ch),
amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary4M}),
assert_channel_fail_max_size(Ch, Monitor),
%% increase the limit
rabbit_ct_broker_helpers:rpc(Config, 0,
application, set_env, [rabbit, max_message_size, 1024 * 1024 * 8]),
{_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary2M}),
assert_channel_alive(Ch1),
amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary4M}),
assert_channel_alive(Ch1),
amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary6M}),
assert_channel_alive(Ch1),
Monitor1 = monitor(process, Ch1),
amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary10M}),
assert_channel_fail_max_size(Ch1, Monitor1),
%% increase beyond the hard limit
rabbit_ct_broker_helpers:rpc(Config, 0,
application, set_env, [rabbit, max_message_size, 1024 * 1024 * 600]),
Val = rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_channel, get_max_message_size, []),
?assertEqual(?MAX_MSG_SIZE, Val).
%% ---------------------------------------------------------------------------
%% rabbitmqctl helpers.
%% ---------------------------------------------------------------------------

View File

@ -0,0 +1,102 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
%%
-module(unit_queue_consumers_SUITE).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
all() ->
[
is_same,
get_consumer,
get
].
is_same(_Config) ->
?assertEqual(
true,
rabbit_queue_consumers:is_same(
self(), <<"1">>,
consumer(self(), <<"1">>)
)),
?assertEqual(
false,
rabbit_queue_consumers:is_same(
self(), <<"1">>,
consumer(self(), <<"2">>)
)),
Pid = spawn(?MODULE, function_for_process, []),
Pid ! whatever,
?assertEqual(
false,
rabbit_queue_consumers:is_same(
self(), <<"1">>,
consumer(Pid, <<"1">>)
)),
ok.
get(_Config) ->
Pid = spawn(?MODULE, function_for_process, []),
Pid ! whatever,
State = state(consumers([consumer(self(), <<"1">>), consumer(Pid, <<"2">>), consumer(self(), <<"3">>)])),
{Pid, {consumer, <<"2">>, _, _, _, _}} =
rabbit_queue_consumers:get(Pid, <<"2">>, State),
?assertEqual(
undefined,
rabbit_queue_consumers:get(self(), <<"2">>, State)
),
?assertEqual(
undefined,
rabbit_queue_consumers:get(Pid, <<"1">>, State)
),
ok.
get_consumer(_Config) ->
Pid = spawn(unit_queue_consumers_SUITE, function_for_process, []),
Pid ! whatever,
State = state(consumers([consumer(self(), <<"1">>), consumer(Pid, <<"2">>), consumer(self(), <<"3">>)])),
{_Pid, {consumer, _, _, _, _, _}} =
rabbit_queue_consumers:get_consumer(State),
?assertEqual(
undefined,
rabbit_queue_consumers:get_consumer(state(consumers([])))
),
ok.
consumers([]) ->
priority_queue:new();
consumers(Consumers) ->
consumers(Consumers, priority_queue:new()).
consumers([H], Q) ->
priority_queue:in(H, Q);
consumers([H | T], Q) ->
consumers(T, priority_queue:in(H, Q)).
consumer(Pid, ConsumerTag) ->
{Pid, {consumer, ConsumerTag, true, 1, [], <<"guest">>}}.
state(Consumers) ->
{state, Consumers, {}}.
function_for_process() ->
receive
_ -> ok
end.