Merge branch 'master' into rabbitmq-common-198

This commit is contained in:
Michael Klishin 2017-06-27 10:17:55 +03:00
commit 7919af892e
42 changed files with 1630 additions and 7017 deletions

View File

@ -1,27 +1,15 @@
# vim:sw=2:et:
# Use a real VM so we can install all the packages we want.
sudo: required
sudo: false
language: erlang
notifications:
email:
- alerts@rabbitmq.com
addons:
apt:
sources:
- sourceline: deb https://packages.erlang-solutions.com/ubuntu precise contrib
key_url: https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc
packages:
# Use Elixir from Erlang Solutions. The provided Elixir is
# installed with kiex but is old. By using an prebuilt Debian
# package, we save the compilation time.
- elixir
- xsltproc
otp_release:
- "18.3"
- "19.0"
- "19.2"
- "19.3"
before_script:
# The checkout made by Travis is a "detached HEAD" and branches
@ -35,11 +23,12 @@ before_script:
git remote add upstream https://github.com/$TRAVIS_REPO_SLUG.git
git fetch upstream stable:stable || :
git fetch upstream master:master || :
# Remove all kiex installations. This makes sure that the Erlang
# Solutions one is picked: it's after the kiex installations in $PATH.
- echo YES | kiex implode
- kiex selfupdate
- test -x ~/.kiex/elixirs/elixir-1.4.4/bin/elixir || kiex install 1.4.4
- kiex default 1.4.4
script: make tests
cache:
apt: true
directories:
- ~/.kiex/elixirs

4
deps/rabbit_common/README.md vendored Normal file
View File

@ -0,0 +1,4 @@
# RabbitMQ Common
This library is shared between [RabbitMQ server](https://github.com/rabbitmq/rabbitmq-server), [RabbitMQ Erlang client](https://github.com/rabbitmq/rabbitmq-erlang-client)
and other RabbitMQ ecosystem projects.

View File

@ -0,0 +1 @@
-define(LAGER_SINK, rabbit_log_lager_event).

View File

@ -46,6 +46,7 @@ dep_rabbitmq_auth_backend_cache = git_rmq rabbitmq-auth-backend-cache $(cu
dep_rabbitmq_auth_backend_http = git_rmq rabbitmq-auth-backend-http $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_auth_backend_ldap = git_rmq rabbitmq-auth-backend-ldap $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_auth_mechanism_ssl = git_rmq rabbitmq-auth-mechanism-ssl $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_aws = git_rmq rabbitmq-aws $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_boot_steps_visualiser = git_rmq rabbitmq-boot-steps-visualiser $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_clusterer = git_rmq rabbitmq-clusterer $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_cli = git_rmq rabbitmq-cli $(current_rmq_ref) $(base_rmq_ref) master
@ -72,6 +73,11 @@ dep_rabbitmq_message_timestamp = git_rmq rabbitmq-message-timestamp $(cur
dep_rabbitmq_metronome = git_rmq rabbitmq-metronome $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_mqtt = git_rmq rabbitmq-mqtt $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_objc_client = git_rmq rabbitmq-objc-client $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_peer_discovery_aws = git_rmq rabbitmq-peer-discovery-aws $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_peer_discovery_common = git_rmq rabbitmq-peer-discovery-common $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_peer_discovery_consul = git_rmq rabbitmq-peer-discovery-consul $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_peer_discovery_etcd = git_rmq rabbitmq-peer-discovery-etcd $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_peer_discovery_k8s = git_rmq rabbitmq-peer-discovery-k8s $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_recent_history_exchange = git_rmq rabbitmq-recent-history-exchange $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_routing_node_stamp = git_rmq rabbitmq-routing-node-stamp $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_rtopic_exchange = git_rmq rabbitmq-rtopic-exchange $(current_rmq_ref) $(base_rmq_ref) master
@ -117,6 +123,7 @@ RABBITMQ_COMPONENTS = amqp_client \
rabbitmq_auth_backend_http \
rabbitmq_auth_backend_ldap \
rabbitmq_auth_mechanism_ssl \
rabbitmq_aws \
rabbitmq_boot_steps_visualiser \
rabbitmq_clusterer \
rabbitmq_cli \
@ -143,6 +150,11 @@ RABBITMQ_COMPONENTS = amqp_client \
rabbitmq_metronome \
rabbitmq_mqtt \
rabbitmq_objc_client \
rabbitmq_peer_discovery_aws \
rabbitmq_peer_discovery_common \
rabbitmq_peer_discovery_consul \
rabbitmq_peer_discovery_etcd \
rabbitmq_peer_discovery_k8s \
rabbitmq_recent_history_exchange \
rabbitmq_routing_node_stamp \
rabbitmq_rtopic_exchange \

View File

@ -1,3 +1,20 @@
# --------------------------------------------------------------------
# xref
# --------------------------------------------------------------------
ifneq ($(PROJECT),rabbit_common)
XREFR_ARGS := --config $(DEPS_DIR)/rabbit_common/xref.config
endif
# --------------------------------------------------------------------
# %-on-concourse dependencies.
# --------------------------------------------------------------------
ifneq ($(words $(filter %-on-concourse,$(MAKECMDGOALS))),0)
TEST_DEPS += ci $(RMQ_CI_CT_HOOKS)
dep_ci = git git@github.com:rabbitmq/rabbitmq-ci master
endif
# --------------------------------------------------------------------
# Common Test flags.
# --------------------------------------------------------------------

View File

@ -2,3 +2,65 @@
ct-slow ct-fast:
$(MAKE) ct CT_SUITES='$(CT_SUITES)'
# --------------------------------------------------------------------
# Helpers to run Make targets on Concourse.
# --------------------------------------------------------------------
FLY ?= fly
FLY_TARGET ?= $(shell $(FLY) targets | awk '/ci-aws\.rabbitmq\.com/ { print $$1; }')
CONCOURSE_TASK = $(ERLANG_MK_TMP)/concourse-task.yaml
CI_DIR ?= $(DEPS_DIR)/ci
PIPELINE_DIR = $(CI_DIR)/server-release
BRANCH_RELEASE = $(shell "$(PIPELINE_DIR)/scripts/map-branch-to-release.sh" "$(base_rmq_ref)")
PIPELINE_DATA = $(PIPELINE_DIR)/release-data-$(BRANCH_RELEASE).yaml
REPOSITORY_NAME = $(shell "$(PIPELINE_DIR)/scripts/map-erlang-app-and-repository-name.sh" "$(PIPELINE_DATA)" "$(PROJECT)")
CONCOURSE_PLATFORM ?= linux
ERLANG_VERSION ?= $(shell "$(PIPELINE_DIR)/scripts/list-erlang-versions.sh" "$(PIPELINE_DATA)" | head -n 1)
TASK_INPUTS = $(shell "$(PIPELINE_DIR)/scripts/list-task-inputs.sh" "$(CONCOURSE_TASK)")
.PHONY: $(CONCOURSE_TASK)
$(CONCOURSE_TASK): $(ERLANG_MK_RECURSIVE_TEST_DEPS_LIST)
$(gen_verbose) echo 'platform: $(CONCOURSE_PLATFORM)' > "$@"
$(verbose) echo 'inputs:' >> "$@"
$(verbose) echo ' - name: $(PROJECT)' >> "$@"
$(verbose) cat $(ERLANG_MK_RECURSIVE_TEST_DEPS_LIST) | while read -r file; do \
echo " - name: $$(basename "$$file")" >> "$@"; \
done
$(verbose) echo 'outputs:' >> "$@"
$(verbose) echo ' - name: test-output' >> "$@"
ifeq ($(CONCOURSE_PLATFORM),linux)
$(verbose) echo 'image_resource:' >> "$@"
$(verbose) echo ' type: docker-image' >> "$@"
$(verbose) echo ' source:' >> "$@"
$(verbose) echo ' repository: pivotalrabbitmq/rabbitmq-server-buildenv' >> "$@"
$(verbose) echo ' tag: linux-erlang-$(ERLANG_VERSION)' >> "$@"
endif
$(verbose) echo 'run:' >> "$@"
$(verbose) echo ' path: ci/server-release/scripts/test-erlang-app.sh' >> "$@"
$(verbose) echo ' args:' >> "$@"
$(verbose) echo " - $(PROJECT)" >> "$@"
# This section must be the last because the `%-on-concourse` target
# appends other variables.
$(verbose) echo 'params:' >> "$@"
ifdef V
$(verbose) echo ' V: "$(V)"' >> "$@"
endif
ifdef t
$(verbose) echo ' t: "$(t)"' >> "$@"
endif
%-on-concourse: $(CONCOURSE_TASK)
$(verbose) test -d "$(PIPELINE_DIR)"
$(verbose) echo ' MAKE_TARGET: "$*"' >> "$(CONCOURSE_TASK)"
$(FLY) -t $(FLY_TARGET) execute \
--config "$(CONCOURSE_TASK)" \
$(foreach input,$(TASK_INPUTS), \
$(if $(filter $(PROJECT),$(input)), \
--input="$(input)=.", \
--input="$(input)=$(DEPS_DIR)/$(input)")) \
--output="test-output=$(CT_LOGS_DIR)/on-concourse"
$(verbose) rm -f "$(CT_LOGS_DIR)/on-concourse/filename"

View File

@ -15,7 +15,8 @@
%%
-module(app_utils).
-export([load_applications/1, start_applications/1, start_applications/2,
-export([load_applications/1,
start_applications/1, start_applications/2, start_applications/3,
stop_applications/1, stop_applications/2, app_dependency_order/2,
app_dependencies/1]).
@ -49,8 +50,11 @@ stop_applications(Apps) ->
end).
start_applications(Apps, ErrorHandler) ->
start_applications(Apps, ErrorHandler, #{}).
start_applications(Apps, ErrorHandler, AppModes) ->
manage_applications(fun lists:foldl/3,
fun start/1,
fun(App) -> start(App, AppModes) end,
fun application:stop/1,
already_started,
ErrorHandler,
@ -62,7 +66,7 @@ stop_applications(Apps, ErrorHandler) ->
rabbit_log:info("Stopping application '~s'", [App]),
application:stop(App)
end,
fun start/1,
fun(App) -> start(App, #{}) end,
not_started,
ErrorHandler,
Apps).
@ -124,9 +128,11 @@ manage_applications(Iterate, Do, Undo, SkipError, ErrorHandler, Apps) ->
end, [], Apps),
ok.
start(rabbit) ->
%% Stops the Erlang VM when the rabbit application stops abnormally
%% i.e. message store reaches its restart limit
application:start(rabbit, transient);
start(App) ->
application:start(App).
start(App, Modes) ->
Mode = maps:get(App, Modes, default_mode(App)),
application:start(App, Mode).
%% Stops the Erlang VM when the rabbit application stops abnormally
%% i.e. message store reaches its restart limit
default_mode(rabbit) -> transient;
default_mode(_) -> temporary.

View File

@ -1451,7 +1451,7 @@ notify_age(CStates, AverageAge) ->
notify_age0(Clients, CStates, Required) ->
case [CState || CState <- CStates, CState#cstate.callback =/= undefined] of
[] -> ok;
Notifications -> S = rand_compat:uniform(length(Notifications)),
Notifications -> S = rand:uniform(length(Notifications)),
{L1, L2} = lists:split(S, Notifications),
notify(Clients, Required, L2 ++ L1)
end.

73
deps/rabbit_common/src/mnesia_sync.erl vendored Normal file
View File

@ -0,0 +1,73 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(mnesia_sync).
%% mnesia:sync_transaction/3 fails to guarantee that the log is flushed to disk
%% at commit. This module is an attempt to minimise the risk of data loss by
%% performing a coalesced log fsync. Unfortunately this is performed regardless
%% of whether or not the log was appended to.
-behaviour(gen_server).
-export([sync/0]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {waiting, disc_node}).
%%----------------------------------------------------------------------------
-spec sync() -> 'ok'.
%%----------------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
sync() ->
gen_server:call(?SERVER, sync, infinity).
%%----------------------------------------------------------------------------
init([]) ->
{ok, #state{disc_node = mnesia:system_info(use_dir), waiting = []}}.
handle_call(sync, _From, #state{disc_node = false} = State) ->
{reply, ok, State};
handle_call(sync, From, #state{waiting = Waiting} = State) ->
{noreply, State#state{waiting = [From | Waiting]}, 0};
handle_call(Request, _From, State) ->
{stop, {unhandled_call, Request}, State}.
handle_cast(Request, State) ->
{stop, {unhandled_cast, Request}, State}.
handle_info(timeout, #state{waiting = Waiting} = State) ->
ok = disk_log:sync(latest_log),
_ = [gen_server:reply(From, ok) || From <- Waiting],
{noreply, State#state{waiting = []}};
handle_info(Message, State) ->
{stop, {unhandled_info, Message}, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -0,0 +1,23 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_amqp_connection).
-export([amqp_params/2]).
-spec amqp_params(pid(), timeout()) -> [{atom(), term()}].
amqp_params(ConnPid, Timeout) ->
gen_server:call(ConnPid, {info, [amqp_params]}, Timeout).

File diff suppressed because it is too large Load Diff

View File

@ -1,590 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_auth_backend_internal).
-include("rabbit.hrl").
-behaviour(rabbit_authn_backend).
-behaviour(rabbit_authz_backend).
-export([user_login_authentication/2, user_login_authorization/1,
check_vhost_access/3, check_resource_access/3, check_topic_access/4]).
-export([add_user/3, delete_user/2, lookup_user/1,
change_password/3, clear_password/2,
hash_password/2, change_password_hash/2, change_password_hash/3,
set_tags/3, set_permissions/6, clear_permissions/3,
set_topic_permissions/6, clear_topic_permissions/3, clear_topic_permissions/4,
add_user_sans_validation/3]).
-export([user_info_keys/0, perms_info_keys/0,
user_perms_info_keys/0, vhost_perms_info_keys/0,
user_vhost_perms_info_keys/0,
list_users/0, list_users/2, list_permissions/0,
list_user_permissions/1, list_user_permissions/3,
list_topic_permissions/0,
list_vhost_permissions/1, list_vhost_permissions/3,
list_user_vhost_permissions/2,
list_user_topic_permissions/1, list_vhost_topic_permissions/1, list_user_vhost_topic_permissions/2]).
%% for testing
-export([hashing_module_for_user/1]).
%%----------------------------------------------------------------------------
-type regexp() :: binary().
-spec add_user(rabbit_types:username(), rabbit_types:password(),
rabbit_types:username()) -> 'ok' | {'error', string()}.
-spec delete_user(rabbit_types:username(), rabbit_types:username()) -> 'ok'.
-spec lookup_user
(rabbit_types:username()) ->
rabbit_types:ok(rabbit_types:internal_user()) |
rabbit_types:error('not_found').
-spec change_password
(rabbit_types:username(), rabbit_types:password(), rabbit_types:username()) -> 'ok'.
-spec clear_password(rabbit_types:username(), rabbit_types:username()) -> 'ok'.
-spec hash_password
(module(), rabbit_types:password()) -> rabbit_types:password_hash().
-spec change_password_hash
(rabbit_types:username(), rabbit_types:password_hash()) -> 'ok'.
-spec set_tags(rabbit_types:username(), [atom()], rabbit_types:username()) -> 'ok'.
-spec set_permissions
(rabbit_types:username(), rabbit_types:vhost(), regexp(), regexp(),
regexp(), rabbit_types:username()) ->
'ok'.
-spec clear_permissions
(rabbit_types:username(), rabbit_types:vhost(), rabbit_types:username()) -> 'ok'.
-spec user_info_keys() -> rabbit_types:info_keys().
-spec perms_info_keys() -> rabbit_types:info_keys().
-spec user_perms_info_keys() -> rabbit_types:info_keys().
-spec vhost_perms_info_keys() -> rabbit_types:info_keys().
-spec user_vhost_perms_info_keys() -> rabbit_types:info_keys().
-spec list_users() -> [rabbit_types:infos()].
-spec list_users(reference(), pid()) -> 'ok'.
-spec list_permissions() -> [rabbit_types:infos()].
-spec list_user_permissions
(rabbit_types:username()) -> [rabbit_types:infos()].
-spec list_user_permissions
(rabbit_types:username(), reference(), pid()) -> 'ok'.
-spec list_vhost_permissions
(rabbit_types:vhost()) -> [rabbit_types:infos()].
-spec list_vhost_permissions
(rabbit_types:vhost(), reference(), pid()) -> 'ok'.
-spec list_user_vhost_permissions
(rabbit_types:username(), rabbit_types:vhost()) -> [rabbit_types:infos()].
%%----------------------------------------------------------------------------
%% Implementation of rabbit_auth_backend
%% Returns a password hashing module for the user record provided. If
%% there is no information in the record, we consider it to be legacy
%% (inserted by a version older than 3.6.0) and fall back to MD5, the
%% now obsolete hashing function.
hashing_module_for_user(#internal_user{
hashing_algorithm = ModOrUndefined}) ->
rabbit_password:hashing_mod(ModOrUndefined).
user_login_authentication(Username, []) ->
internal_check_user_login(Username, fun(_) -> true end);
user_login_authentication(Username, AuthProps) ->
case lists:keyfind(password, 1, AuthProps) of
{password, Cleartext} ->
internal_check_user_login(
Username,
fun (#internal_user{
password_hash = <<Salt:4/binary, Hash/binary>>
} = U) ->
Hash =:= rabbit_password:salted_hash(
hashing_module_for_user(U), Salt, Cleartext);
(#internal_user{}) ->
false
end);
false -> exit({unknown_auth_props, Username, AuthProps})
end.
user_login_authorization(Username) ->
case user_login_authentication(Username, []) of
{ok, #auth_user{impl = Impl, tags = Tags}} -> {ok, Impl, Tags};
Else -> Else
end.
internal_check_user_login(Username, Fun) ->
Refused = {refused, "user '~s' - invalid credentials", [Username]},
case lookup_user(Username) of
{ok, User = #internal_user{tags = Tags}} ->
case Fun(User) of
true -> {ok, #auth_user{username = Username,
tags = Tags,
impl = none}};
_ -> Refused
end;
{error, not_found} ->
Refused
end.
check_vhost_access(#auth_user{username = Username}, VHostPath, _Sock) ->
case mnesia:dirty_read({rabbit_user_permission,
#user_vhost{username = Username,
virtual_host = VHostPath}}) of
[] -> false;
[_R] -> true
end.
check_resource_access(#auth_user{username = Username},
#resource{virtual_host = VHostPath, name = Name},
Permission) ->
case mnesia:dirty_read({rabbit_user_permission,
#user_vhost{username = Username,
virtual_host = VHostPath}}) of
[] ->
false;
[#user_permission{permission = P}] ->
PermRegexp = case element(permission_index(Permission), P) of
%% <<"^$">> breaks Emacs' erlang mode
<<"">> -> <<$^, $$>>;
RE -> RE
end,
case re:run(Name, PermRegexp, [{capture, none}]) of
match -> true;
nomatch -> false
end
end.
check_topic_access(#auth_user{username = Username},
#resource{virtual_host = VHostPath, name = Name, kind = topic},
Permission,
Context) ->
case mnesia:dirty_read({rabbit_topic_permission,
#topic_permission_key{user_vhost = #user_vhost{username = Username,
virtual_host = VHostPath},
exchange = Name
}}) of
[] ->
true;
[#topic_permission{permission = P}] ->
PermRegexp = case element(permission_index(Permission), P) of
%% <<"^$">> breaks Emacs' erlang mode
<<"">> -> <<$^, $$>>;
RE -> RE
end,
case re:run(maps:get(routing_key, Context), PermRegexp, [{capture, none}]) of
match -> true;
nomatch -> false
end
end.
permission_index(configure) -> #permission.configure;
permission_index(write) -> #permission.write;
permission_index(read) -> #permission.read.
%%----------------------------------------------------------------------------
%% Manipulation of the user database
validate_credentials(Username, Password) ->
rabbit_credential_validation:validate(Username, Password).
validate_and_alternate_credentials(Username, Password, ActingUser, Fun) ->
case validate_credentials(Username, Password) of
ok ->
Fun(Username, Password, ActingUser);
{error, Err} ->
rabbit_log:error("Credential validation for '~s' failed!~n", [Username]),
{error, Err}
end.
add_user(Username, Password, ActingUser) ->
validate_and_alternate_credentials(Username, Password, ActingUser,
fun add_user_sans_validation/3).
add_user_sans_validation(Username, Password, ActingUser) ->
rabbit_log:info("Creating user '~s'~n", [Username]),
%% hash_password will pick the hashing function configured for us
%% but we also need to store a hint as part of the record, so we
%% retrieve it here one more time
HashingMod = rabbit_password:hashing_mod(),
User = #internal_user{username = Username,
password_hash = hash_password(HashingMod, Password),
tags = [],
hashing_algorithm = HashingMod},
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_user, Username}) of
[] ->
ok = mnesia:write(rabbit_user, User, write);
_ ->
mnesia:abort({user_already_exists, Username})
end
end),
rabbit_event:notify(user_created, [{name, Username},
{user_who_performed_action, ActingUser}]),
R.
delete_user(Username, ActingUser) ->
rabbit_log:info("Deleting user '~s'~n", [Username]),
R = rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user(
Username,
fun () ->
ok = mnesia:delete({rabbit_user, Username}),
[ok = mnesia:delete_object(
rabbit_user_permission, R, write) ||
R <- mnesia:match_object(
rabbit_user_permission,
#user_permission{user_vhost = #user_vhost{
username = Username,
virtual_host = '_'},
permission = '_'},
write)],
UserTopicPermissionsQuery = match_user_vhost_topic_permission(Username, '_'),
UserTopicPermissions = UserTopicPermissionsQuery(),
[ok = mnesia:delete_object(rabbit_topic_permission, R, write) || R <- UserTopicPermissions],
ok
end)),
rabbit_event:notify(user_deleted,
[{name, Username},
{user_who_performed_action, ActingUser}]),
R.
lookup_user(Username) ->
rabbit_misc:dirty_read({rabbit_user, Username}).
change_password(Username, Password, ActingUser) ->
validate_and_alternate_credentials(Username, Password, ActingUser,
fun change_password_sans_validation/3).
change_password_sans_validation(Username, Password, ActingUser) ->
rabbit_log:info("Changing password for '~s'~n", [Username]),
HashingAlgorithm = rabbit_password:hashing_mod(),
R = change_password_hash(Username,
hash_password(rabbit_password:hashing_mod(),
Password),
HashingAlgorithm),
rabbit_event:notify(user_password_changed,
[{name, Username},
{user_who_performed_action, ActingUser}]),
R.
clear_password(Username, ActingUser) ->
rabbit_log:info("Clearing password for '~s'~n", [Username]),
R = change_password_hash(Username, <<"">>),
rabbit_event:notify(user_password_cleared,
[{name, Username},
{user_who_performed_action, ActingUser}]),
R.
hash_password(HashingMod, Cleartext) ->
rabbit_password:hash(HashingMod, Cleartext).
change_password_hash(Username, PasswordHash) ->
change_password_hash(Username, PasswordHash, rabbit_password:hashing_mod()).
change_password_hash(Username, PasswordHash, HashingAlgorithm) ->
update_user(Username, fun(User) ->
User#internal_user{
password_hash = PasswordHash,
hashing_algorithm = HashingAlgorithm }
end).
set_tags(Username, Tags, ActingUser) ->
ConvertedTags = [rabbit_data_coercion:to_atom(I) || I <- Tags],
rabbit_log:info("Setting user tags for user '~s' to ~p~n",
[Username, ConvertedTags]),
R = update_user(Username, fun(User) ->
User#internal_user{tags = ConvertedTags}
end),
rabbit_event:notify(user_tags_set, [{name, Username}, {tags, ConvertedTags},
{user_who_performed_action, ActingUser}]),
R.
set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, ActingUser) ->
rabbit_log:info("Setting permissions for "
"'~s' in '~s' to '~s', '~s', '~s'~n",
[Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm]),
lists:map(
fun (RegexpBin) ->
Regexp = binary_to_list(RegexpBin),
case re:compile(Regexp) of
{ok, _} -> ok;
{error, Reason} -> throw({error, {invalid_regexp,
Regexp, Reason}})
end
end, [ConfigurePerm, WritePerm, ReadPerm]),
R = rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
Username, VHostPath,
fun () -> ok = mnesia:write(
rabbit_user_permission,
#user_permission{user_vhost = #user_vhost{
username = Username,
virtual_host = VHostPath},
permission = #permission{
configure = ConfigurePerm,
write = WritePerm,
read = ReadPerm}},
write)
end)),
rabbit_event:notify(permission_created, [{user, Username},
{vhost, VHostPath},
{configure, ConfigurePerm},
{write, WritePerm},
{read, ReadPerm},
{user_who_performed_action, ActingUser}]),
R.
clear_permissions(Username, VHostPath, ActingUser) ->
R = rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
Username, VHostPath,
fun () ->
ok = mnesia:delete({rabbit_user_permission,
#user_vhost{username = Username,
virtual_host = VHostPath}})
end)),
rabbit_event:notify(permission_deleted, [{user, Username},
{vhost, VHostPath},
{user_who_performed_action, ActingUser}]),
R.
update_user(Username, Fun) ->
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user(
Username,
fun () ->
{ok, User} = lookup_user(Username),
ok = mnesia:write(rabbit_user, Fun(User), write)
end)).
set_topic_permissions(Username, VHostPath, Exchange, WritePerm, ReadPerm, ActingUser) ->
WritePermRegex = rabbit_data_coercion:to_binary(WritePerm),
ReadPermRegex = rabbit_data_coercion:to_binary(ReadPerm),
lists:map(
fun (RegexpBin) ->
case re:compile(RegexpBin) of
{ok, _} -> ok;
{error, Reason} -> throw({error, {invalid_regexp,
RegexpBin, Reason}})
end
end, [WritePerm, ReadPerm]),
R = rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
Username, VHostPath,
fun () -> ok = mnesia:write(
rabbit_topic_permission,
#topic_permission{
topic_permission_key = #topic_permission_key{
user_vhost = #user_vhost{
username = Username,
virtual_host = VHostPath},
exchange = Exchange
},
permission = #permission{
write = WritePermRegex,
read = ReadPermRegex
}
},
write)
end)),
rabbit_event:notify(topic_permission_created, [
{user, Username},
{vhost, VHostPath},
{exchange, Exchange},
{write, WritePermRegex},
{read, ReadPermRegex},
{user_who_performed_action, ActingUser}]),
R.
clear_topic_permissions(Username, VHostPath, ActingUser) ->
R = rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
Username, VHostPath,
fun () ->
ListFunction = match_user_vhost_topic_permission(Username, VHostPath),
List = ListFunction(),
lists:foreach(fun(X) ->
ok = mnesia:delete_object(rabbit_topic_permission, X, write)
end, List)
end)),
rabbit_event:notify(topic_permission_deleted, [{user, Username},
{vhost, VHostPath},
{user_who_performed_action, ActingUser}]),
R.
clear_topic_permissions(Username, VHostPath, Exchange, ActingUser) ->
R = rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
Username, VHostPath,
fun () ->
ok = mnesia:delete(rabbit_topic_permission,
#topic_permission_key{
user_vhost = #user_vhost{
username = Username,
virtual_host = VHostPath},
exchange = Exchange
}, write)
end)),
rabbit_event:notify(permission_deleted, [{user, Username},
{vhost, VHostPath},
{user_who_performed_action, ActingUser}]),
R.
%%----------------------------------------------------------------------------
%% Listing
-define(PERMS_INFO_KEYS, [configure, write, read]).
-define(USER_INFO_KEYS, [user, tags]).
user_info_keys() -> ?USER_INFO_KEYS.
perms_info_keys() -> [user, vhost | ?PERMS_INFO_KEYS].
vhost_perms_info_keys() -> [user | ?PERMS_INFO_KEYS].
user_perms_info_keys() -> [vhost | ?PERMS_INFO_KEYS].
user_vhost_perms_info_keys() -> ?PERMS_INFO_KEYS.
topic_perms_info_keys() -> [user, vhost, exchange, write, read].
user_topic_perms_info_keys() -> [vhost, exchange, write, read].
vhost_topic_perms_info_keys() -> [user, exchange, write, read].
user_vhost_topic_perms_info_keys() -> [exchange, write, read].
list_users() ->
[extract_internal_user_params(U) ||
U <- mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})].
list_users(Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(
AggregatorPid, Ref,
fun(U) -> extract_internal_user_params(U) end,
mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})).
list_permissions() ->
list_permissions(perms_info_keys(), match_user_vhost('_', '_')).
list_permissions(Keys, QueryThunk) ->
[extract_user_permission_params(Keys, U) ||
U <- rabbit_misc:execute_mnesia_transaction(QueryThunk)].
list_permissions(Keys, QueryThunk, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(
AggregatorPid, Ref, fun(U) -> extract_user_permission_params(Keys, U) end,
rabbit_misc:execute_mnesia_transaction(QueryThunk)).
filter_props(Keys, Props) -> [T || T = {K, _} <- Props, lists:member(K, Keys)].
list_user_permissions(Username) ->
list_permissions(
user_perms_info_keys(),
rabbit_misc:with_user(Username, match_user_vhost(Username, '_'))).
list_user_permissions(Username, Ref, AggregatorPid) ->
list_permissions(
user_perms_info_keys(),
rabbit_misc:with_user(Username, match_user_vhost(Username, '_')),
Ref, AggregatorPid).
list_vhost_permissions(VHostPath) ->
list_permissions(
vhost_perms_info_keys(),
rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath))).
list_vhost_permissions(VHostPath, Ref, AggregatorPid) ->
list_permissions(
vhost_perms_info_keys(),
rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath)),
Ref, AggregatorPid).
list_user_vhost_permissions(Username, VHostPath) ->
list_permissions(
user_vhost_perms_info_keys(),
rabbit_misc:with_user_and_vhost(
Username, VHostPath, match_user_vhost(Username, VHostPath))).
extract_user_permission_params(Keys, #user_permission{
user_vhost =
#user_vhost{username = Username,
virtual_host = VHostPath},
permission = #permission{
configure = ConfigurePerm,
write = WritePerm,
read = ReadPerm}}) ->
filter_props(Keys, [{user, Username},
{vhost, VHostPath},
{configure, ConfigurePerm},
{write, WritePerm},
{read, ReadPerm}]).
extract_internal_user_params(#internal_user{username = Username, tags = Tags}) ->
[{user, Username}, {tags, Tags}].
match_user_vhost(Username, VHostPath) ->
fun () -> mnesia:match_object(
rabbit_user_permission,
#user_permission{user_vhost = #user_vhost{
username = Username,
virtual_host = VHostPath},
permission = '_'},
read)
end.
list_topic_permissions() ->
list_topic_permissions(topic_perms_info_keys(), match_user_vhost_topic_permission('_', '_')).
list_user_topic_permissions(Username) ->
list_topic_permissions(user_topic_perms_info_keys(),
rabbit_misc:with_user(Username, match_user_vhost_topic_permission(Username, '_'))).
list_vhost_topic_permissions(VHost) ->
list_topic_permissions(vhost_topic_perms_info_keys(),
rabbit_vhost:with(VHost, match_user_vhost_topic_permission('_', VHost))).
list_user_vhost_topic_permissions(Username, VHost) ->
list_topic_permissions(user_vhost_topic_perms_info_keys(),
rabbit_misc:with_user_and_vhost(Username, VHost, match_user_vhost_topic_permission(Username, VHost))).
list_topic_permissions(Keys, QueryThunk) ->
[extract_topic_permission_params(Keys, U) ||
U <- rabbit_misc:execute_mnesia_transaction(QueryThunk)].
match_user_vhost_topic_permission(Username, VHostPath) ->
match_user_vhost_topic_permission(Username, VHostPath, '_').
match_user_vhost_topic_permission(Username, VHostPath, Exchange) ->
fun () -> mnesia:match_object(
rabbit_topic_permission,
#topic_permission{topic_permission_key = #topic_permission_key{
user_vhost = #user_vhost{
username = Username,
virtual_host = VHostPath},
exchange = Exchange},
permission = '_'},
read)
end.
extract_topic_permission_params(Keys, #topic_permission{
topic_permission_key = #topic_permission_key{
user_vhost = #user_vhost{username = Username,
virtual_host = VHostPath},
exchange = Exchange},
permission = #permission{
write = WritePerm,
read = ReadPerm}}) ->
filter_props(Keys, [{user, Username},
{vhost, VHostPath},
{exchange, Exchange},
{write, WritePerm},
{read, ReadPerm}]).

View File

@ -30,7 +30,7 @@
%% Something went wrong. Log and die.
%% {refused, Msg, Args}
%% Client failed authentication. Log and die.
-callback user_login_authentication(rabbit_types:username(), [term()]) ->
-callback user_login_authentication(rabbit_types:username(), [term()] | map()) ->
{'ok', rabbit_types:auth_user()} |
{'refused', string(), [any()]} |
{'error', any()}.

View File

@ -1,324 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_basic).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
-export([publish/4, publish/5, publish/1,
message/3, message/4, properties/1, prepend_table_header/3,
extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
header_routes/1, parse_expiration/1, header/2, header/3]).
-export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]).
%%----------------------------------------------------------------------------
-type properties_input() ::
rabbit_framing:amqp_property_record() | [{atom(), any()}].
-type publish_result() ::
{ok, [pid()]} | rabbit_types:error('not_found').
-type header() :: any().
-type headers() :: rabbit_framing:amqp_table() | 'undefined'.
-type exchange_input() :: rabbit_types:exchange() | rabbit_exchange:name().
-type body_input() :: binary() | [binary()].
-spec publish
(exchange_input(), rabbit_router:routing_key(), properties_input(),
body_input()) ->
publish_result().
-spec publish
(exchange_input(), rabbit_router:routing_key(), boolean(),
properties_input(), body_input()) ->
publish_result().
-spec publish(rabbit_types:delivery()) -> publish_result().
-spec delivery
(boolean(), boolean(), rabbit_types:message(), undefined | integer()) ->
rabbit_types:delivery().
-spec message
(rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(),
binary()) ->
rabbit_types:message().
-spec message
(rabbit_exchange:name(), rabbit_router:routing_key(),
rabbit_types:decoded_content()) ->
rabbit_types:ok_or_error2(rabbit_types:message(), any()).
-spec properties
(properties_input()) -> rabbit_framing:amqp_property_record().
-spec prepend_table_header
(binary(), rabbit_framing:amqp_table(), headers()) -> headers().
-spec header(header(), headers()) -> 'undefined' | any().
-spec header(header(), headers(), any()) -> 'undefined' | any().
-spec extract_headers(rabbit_types:content()) -> headers().
-spec map_headers
(fun((headers()) -> headers()), rabbit_types:content()) ->
rabbit_types:content().
-spec header_routes(undefined | rabbit_framing:amqp_table()) -> [string()].
-spec build_content
(rabbit_framing:amqp_property_record(), binary() | [binary()]) ->
rabbit_types:content().
-spec from_content
(rabbit_types:content()) ->
{rabbit_framing:amqp_property_record(), binary()}.
-spec parse_expiration
(rabbit_framing:amqp_property_record()) ->
rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any()).
-spec msg_size
(rabbit_types:content() | rabbit_types:message()) -> non_neg_integer().
-spec maybe_gc_large_msg
(rabbit_types:content() | rabbit_types:message()) -> non_neg_integer().
%%----------------------------------------------------------------------------
%% Convenience function, for avoiding round-trips in calls across the
%% erlang distributed network.
publish(Exchange, RoutingKeyBin, Properties, Body) ->
publish(Exchange, RoutingKeyBin, false, Properties, Body).
%% Convenience function, for avoiding round-trips in calls across the
%% erlang distributed network.
publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) ->
Message = message(XName, RKey, properties(Props), Body),
publish(X, delivery(Mandatory, false, Message, undefined));
publish(XName, RKey, Mandatory, Props, Body) ->
Message = message(XName, RKey, properties(Props), Body),
publish(delivery(Mandatory, false, Message, undefined)).
publish(Delivery = #delivery{
message = #basic_message{exchange_name = XName}}) ->
case rabbit_exchange:lookup(XName) of
{ok, X} -> publish(X, Delivery);
Err -> Err
end.
publish(X, Delivery) ->
Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)),
DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery),
{ok, DeliveredQPids}.
delivery(Mandatory, Confirm, Message, MsgSeqNo) ->
#delivery{mandatory = Mandatory, confirm = Confirm, sender = self(),
message = Message, msg_seq_no = MsgSeqNo, flow = noflow}.
build_content(Properties, BodyBin) when is_binary(BodyBin) ->
build_content(Properties, [BodyBin]);
build_content(Properties, PFR) ->
%% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
{ClassId, _MethodId} =
rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
#content{class_id = ClassId,
properties = Properties,
properties_bin = none,
protocol = none,
payload_fragments_rev = PFR}.
from_content(Content) ->
#content{class_id = ClassId,
properties = Props,
payload_fragments_rev = FragmentsRev} =
rabbit_binary_parser:ensure_content_decoded(Content),
%% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
{ClassId, _MethodId} =
rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
{Props, list_to_binary(lists:reverse(FragmentsRev))}.
%% This breaks the spec rule forbidding message modification
strip_header(#content{properties = #'P_basic'{headers = undefined}}
= DecodedContent, _Key) ->
DecodedContent;
strip_header(#content{properties = Props = #'P_basic'{headers = Headers}}
= DecodedContent, Key) ->
case lists:keysearch(Key, 1, Headers) of
false -> DecodedContent;
{value, Found} -> Headers0 = lists:delete(Found, Headers),
rabbit_binary_generator:clear_encoded_content(
DecodedContent#content{
properties = Props#'P_basic'{
headers = Headers0}})
end.
message(XName, RoutingKey, #content{properties = Props} = DecodedContent) ->
try
{ok, #basic_message{
exchange_name = XName,
content = strip_header(DecodedContent, ?DELETED_HEADER),
id = rabbit_guid:gen(),
is_persistent = is_message_persistent(DecodedContent),
routing_keys = [RoutingKey |
header_routes(Props#'P_basic'.headers)]}}
catch
{error, _Reason} = Error -> Error
end.
message(XName, RoutingKey, RawProperties, Body) ->
Properties = properties(RawProperties),
Content = build_content(Properties, Body),
{ok, Msg} = message(XName, RoutingKey, Content),
Msg.
properties(P = #'P_basic'{}) ->
P;
properties(P) when is_list(P) ->
%% Yes, this is O(length(P) * record_info(size, 'P_basic') / 2),
%% i.e. slow. Use the definition of 'P_basic' directly if
%% possible!
lists:foldl(fun ({Key, Value}, Acc) ->
case indexof(record_info(fields, 'P_basic'), Key) of
0 -> throw({unknown_basic_property, Key});
N -> setelement(N + 1, Acc, Value)
end
end, #'P_basic'{}, P).
prepend_table_header(Name, Info, undefined) ->
prepend_table_header(Name, Info, []);
prepend_table_header(Name, Info, Headers) ->
case rabbit_misc:table_lookup(Headers, Name) of
{array, Existing} ->
prepend_table(Name, Info, Existing, Headers);
undefined ->
prepend_table(Name, Info, [], Headers);
Other ->
Headers2 = prepend_table(Name, Info, [], Headers),
set_invalid_header(Name, Other, Headers2)
end.
prepend_table(Name, Info, Prior, Headers) ->
rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]).
set_invalid_header(Name, {_, _}=Value, Headers) when is_list(Headers) ->
case rabbit_misc:table_lookup(Headers, ?INVALID_HEADERS_KEY) of
undefined ->
set_invalid([{Name, array, [Value]}], Headers);
{table, ExistingHdr} ->
update_invalid(Name, Value, ExistingHdr, Headers);
Other ->
%% somehow the x-invalid-headers header is corrupt
Invalid = [{?INVALID_HEADERS_KEY, array, [Other]}],
set_invalid_header(Name, Value, set_invalid(Invalid, Headers))
end.
set_invalid(NewHdr, Headers) ->
rabbit_misc:set_table_value(Headers, ?INVALID_HEADERS_KEY, table, NewHdr).
update_invalid(Name, Value, ExistingHdr, Header) ->
Values = case rabbit_misc:table_lookup(ExistingHdr, Name) of
undefined -> [Value];
{array, Prior} -> [Value | Prior]
end,
NewHdr = rabbit_misc:set_table_value(ExistingHdr, Name, array, Values),
set_invalid(NewHdr, Header).
header(_Header, undefined) ->
undefined;
header(_Header, []) ->
undefined;
header(Header, Headers) ->
header(Header, Headers, undefined).
header(Header, Headers, Default) ->
case lists:keysearch(Header, 1, Headers) of
false -> Default;
{value, Val} -> Val
end.
extract_headers(Content) ->
#content{properties = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
Headers.
extract_timestamp(Content) ->
#content{properties = #'P_basic'{timestamp = Timestamp}} =
rabbit_binary_parser:ensure_content_decoded(Content),
Timestamp.
map_headers(F, Content) ->
Content1 = rabbit_binary_parser:ensure_content_decoded(Content),
#content{properties = #'P_basic'{headers = Headers} = Props} = Content1,
Headers1 = F(Headers),
rabbit_binary_generator:clear_encoded_content(
Content1#content{properties = Props#'P_basic'{headers = Headers1}}).
indexof(L, Element) -> indexof(L, Element, 1).
indexof([], _Element, _N) -> 0;
indexof([Element | _Rest], Element, N) -> N;
indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
case Mode of
1 -> false;
2 -> true;
undefined -> false;
Other -> throw({error, {delivery_mode_unknown, Other}})
end.
%% Extract CC routes from headers
header_routes(undefined) ->
[];
header_routes(HeadersTable) ->
lists:append(
[case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of
{array, Routes} -> [Route || {longstr, Route} <- Routes];
undefined -> [];
{Type, _Val} -> throw({error, {unacceptable_type_in_header,
binary_to_list(HeaderKey), Type}})
end || HeaderKey <- ?ROUTING_HEADERS]).
parse_expiration(#'P_basic'{expiration = undefined}) ->
{ok, undefined};
parse_expiration(#'P_basic'{expiration = Expiration}) ->
case string:to_integer(binary_to_list(Expiration)) of
{error, no_integer} = E ->
E;
{N, ""} ->
case rabbit_misc:check_expiry(N) of
ok -> {ok, N};
E = {error, _} -> E
end;
{_, S} ->
{error, {leftover_string, S}}
end.
%% Some processes (channel, writer) can get huge amounts of binary
%% garbage when processing huge messages at high speed (since we only
%% do enough reductions to GC every few hundred messages, and if each
%% message is 1MB then that's ugly). So count how many bytes of
%% message we have processed, and force a GC every so often.
maybe_gc_large_msg(Content) ->
Size = msg_size(Content),
Current = case get(msg_size_for_gc) of
undefined -> 0;
C -> C
end,
New = Current + Size,
put(msg_size_for_gc, case New > 1000000 of
true -> erlang:garbage_collect(),
0;
false -> New
end),
Size.
msg_size(#content{payload_fragments_rev = PFR}) -> iolist_size(PFR);
msg_size(#basic_message{content = Content}) -> msg_size(Content).

View File

@ -0,0 +1,50 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_basic_common).
-include("rabbit.hrl").
-export([build_content/2, from_content/1]).
-spec build_content
(rabbit_framing:amqp_property_record(), binary() | [binary()]) ->
rabbit_types:content().
-spec from_content
(rabbit_types:content()) ->
{rabbit_framing:amqp_property_record(), binary()}.
build_content(Properties, BodyBin) when is_binary(BodyBin) ->
build_content(Properties, [BodyBin]);
build_content(Properties, PFR) ->
%% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
{ClassId, _MethodId} =
rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
#content{class_id = ClassId,
properties = Properties,
properties_bin = none,
protocol = none,
payload_fragments_rev = PFR}.
from_content(Content) ->
#content{class_id = ClassId,
properties = Props,
payload_fragments_rev = FragmentsRev} =
rabbit_binary_parser:ensure_content_decoded(Content),
%% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
{ClassId, _MethodId} =
rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
{Props, list_to_binary(lists:reverse(FragmentsRev))}.

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,34 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_channel_common).
-export([do/2, do/3, do_flow/3, ready_for_close/1]).
do(Pid, Method) ->
do(Pid, Method, none).
do(Pid, Method, Content) ->
gen_server2:cast(Pid, {method, Method, Content, noflow}).
do_flow(Pid, Method, Content) ->
%% Here we are tracking messages sent by the rabbit_reader
%% process. We are accessing the rabbit_reader process dictionary.
credit_flow:send(Pid),
gen_server2:cast(Pid, {method, Method, Content, flow}).
ready_for_close(Pid) ->
gen_server2:cast(Pid, ready_for_close).

View File

@ -1,113 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_channel_interceptor).
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
-export([init/1, intercept_in/3]).
-behaviour(rabbit_registry_class).
-export([added_to_rabbit_registry/2, removed_from_rabbit_registry/1]).
-type(method_name() :: rabbit_framing:amqp_method_name()).
-type(original_method() :: rabbit_framing:amqp_method_record()).
-type(processed_method() :: rabbit_framing:amqp_method_record()).
-type(original_content() :: rabbit_types:maybe(rabbit_types:content())).
-type(processed_content() :: rabbit_types:maybe(rabbit_types:content())).
-type(interceptor_state() :: term()).
-callback description() -> [proplists:property()].
%% Derive some initial state from the channel. This will be passed back
%% as the third argument of intercept/3.
-callback init(rabbit_channel:channel()) -> interceptor_state().
-callback intercept(original_method(), original_content(),
interceptor_state()) ->
{processed_method(), processed_content()} |
rabbit_misc:channel_or_connection_exit().
-callback applies_to() -> list(method_name()).
added_to_rabbit_registry(_Type, _ModuleName) ->
rabbit_channel:refresh_interceptors().
removed_from_rabbit_registry(_Type) ->
rabbit_channel:refresh_interceptors().
init(Ch) ->
Mods = [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor)],
check_no_overlap(Mods),
[{Mod, Mod:init(Ch)} || Mod <- Mods].
check_no_overlap(Mods) ->
check_no_overlap1([sets:from_list(Mod:applies_to()) || Mod <- Mods]).
%% Check no non-empty pairwise intersection in a list of sets
check_no_overlap1(Sets) ->
lists:foldl(fun(Set, Union) ->
Is = sets:intersection(Set, Union),
case sets:size(Is) of
0 -> ok;
_ ->
internal_error("Interceptor: more than one "
"module handles ~p~n", [Is])
end,
sets:union(Set, Union)
end,
sets:new(),
Sets),
ok.
intercept_in(M, C, Mods) ->
lists:foldl(fun({Mod, ModState}, {M1, C1}) ->
call_module(Mod, ModState, M1, C1)
end,
{M, C},
Mods).
call_module(Mod, St, M, C) ->
% this little dance is because Mod might be unloaded at any point
case (catch {ok, Mod:intercept(M, C, St)}) of
{ok, R} -> validate_response(Mod, M, C, R);
{'EXIT', {undef, [{Mod, intercept, _, _} | _]}} -> {M, C}
end.
validate_response(Mod, M1, C1, R = {M2, C2}) ->
case {validate_method(M1, M2), validate_content(C1, C2)} of
{true, true} -> R;
{false, _} ->
internal_error("Interceptor: ~p expected to return "
"method: ~p but returned: ~p",
[Mod, rabbit_misc:method_record_type(M1),
rabbit_misc:method_record_type(M2)]);
{_, false} ->
internal_error("Interceptor: ~p expected to return "
"content iff content is provided but "
"content in = ~p; content out = ~p",
[Mod, C1, C2])
end.
validate_method(M, M2) ->
rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2).
validate_content(none, none) -> true;
validate_content(#content{}, #content{}) -> true;
validate_content(_, _) -> false.
%% keep dialyzer happy
-spec internal_error(string(), [any()]) -> no_return().
internal_error(Format, Args) ->
rabbit_misc:protocol_error(internal_error, Format, Args).

View File

@ -123,14 +123,15 @@ collect_monitors([]) ->
ok;
collect_monitors([Monitor|Rest]) ->
receive
{'DOWN', Monitor, _Pid, normal} ->
{'DOWN', Monitor, process, _Pid, normal} ->
collect_monitors(Rest);
{'DOWN', Monitor, _Pid, noproc} ->
{'DOWN', Monitor, process, _Pid, noproc} ->
%% There is a link and a monitor to a process. Matching
%% this clause means that process has gracefully
%% terminated even before we've started monitoring.
collect_monitors(Rest);
{'DOWN', _, Pid, Reason} ->
{'DOWN', _, process, Pid, Reason} when Reason =/= normal,
Reason =/= noproc ->
exit({emitter_exit, Pid, Reason})
end.

View File

@ -1,114 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_exchange_decorator).
-include("rabbit.hrl").
-export([select/2, set/1]).
-behaviour(rabbit_registry_class).
-export([added_to_rabbit_registry/2, removed_from_rabbit_registry/1]).
%% This is like an exchange type except that:
%%
%% 1) It applies to all exchanges as soon as it is installed, therefore
%% 2) It is not allowed to affect validation, so no validate/1 or
%% assert_args_equivalence/2
%%
%% It's possible in the future we might make decorators
%% able to manipulate messages as they are published.
-type(tx() :: 'transaction' | 'none').
-type(serial() :: pos_integer() | tx()).
-callback description() -> [proplists:property()].
%% Should Rabbit ensure that all binding events that are
%% delivered to an individual exchange can be serialised? (they
%% might still be delivered out of order, but there'll be a
%% serial number).
-callback serialise_events(rabbit_types:exchange()) -> boolean().
%% called after declaration and recovery
-callback create(tx(), rabbit_types:exchange()) -> 'ok'.
%% called after exchange (auto)deletion.
-callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) ->
'ok'.
%% called when the policy attached to this exchange changes.
-callback policy_changed(rabbit_types:exchange(), rabbit_types:exchange()) ->
'ok'.
%% called after a binding has been added or recovered
-callback add_binding(serial(), rabbit_types:exchange(),
rabbit_types:binding()) -> 'ok'.
%% called after bindings have been deleted.
-callback remove_bindings(serial(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok'.
%% Allows additional destinations to be added to the routing decision.
-callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
[rabbit_amqqueue:name() | rabbit_exchange:name()].
%% Whether the decorator wishes to receive callbacks for the exchange
%% none:no callbacks, noroute:all callbacks except route, all:all callbacks
-callback active_for(rabbit_types:exchange()) -> 'none' | 'noroute' | 'all'.
%%----------------------------------------------------------------------------
added_to_rabbit_registry(_Type, _ModuleName) ->
[maybe_recover(X) || X <- rabbit_exchange:list()],
ok.
removed_from_rabbit_registry(_Type) ->
[maybe_recover(X) || X <- rabbit_exchange:list()],
ok.
%% select a subset of active decorators
select(all, {Route, NoRoute}) -> filter(Route ++ NoRoute);
select(route, {Route, _NoRoute}) -> filter(Route);
select(raw, {Route, NoRoute}) -> Route ++ NoRoute.
filter(Modules) ->
[M || M <- Modules, code:which(M) =/= non_existing].
set(X) ->
Decs = lists:foldl(fun (D, {Route, NoRoute}) ->
ActiveFor = D:active_for(X),
{cons_if_eq(all, ActiveFor, D, Route),
cons_if_eq(noroute, ActiveFor, D, NoRoute)}
end, {[], []}, list()),
X#exchange{decorators = Decs}.
list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
cons_if_eq(Select, Select, Item, List) -> [Item | List];
cons_if_eq(_Select, _Other, _Item, List) -> List.
maybe_recover(X = #exchange{name = Name,
decorators = Decs}) ->
#exchange{decorators = Decs1} = set(X),
Old = lists:sort(select(all, Decs)),
New = lists:sort(select(all, Decs1)),
case New of
Old -> ok;
_ -> %% TODO create a tx here for non-federation decorators
[M:create(none, X) || M <- New -- Old],
rabbit_exchange:update_decorators(Name)
end.

View File

@ -1,95 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_health_check).
%% External API
-export([node/1, node/2]).
%% Internal API
-export([local/0]).
-spec node(node(), timeout()) -> ok | {badrpc, term()} | {error_string, string()}.
-spec local() -> ok | {error_string, string()}.
%%----------------------------------------------------------------------------
%% External functions
%%----------------------------------------------------------------------------
node(Node) ->
%% same default as in CLI
node(Node, 70000).
node(Node, Timeout) ->
rabbit_misc:rpc_call(Node, rabbit_health_check, local, [], Timeout).
local() ->
run_checks([list_channels, list_queues, alarms, rabbit_node_monitor]).
%%----------------------------------------------------------------------------
%% Internal functions
%%----------------------------------------------------------------------------
run_checks([]) ->
ok;
run_checks([C|Cs]) ->
case node_health_check(C) of
ok ->
run_checks(Cs);
Error ->
Error
end.
node_health_check(list_channels) ->
case rabbit_channel:info_local([pid]) of
L when is_list(L) ->
ok;
Other ->
ErrorMsg = io_lib:format("list_channels unexpected output: ~p",
[Other]),
{error_string, ErrorMsg}
end;
node_health_check(list_queues) ->
health_check_queues(rabbit_vhost:list());
node_health_check(rabbit_node_monitor) ->
case rabbit_node_monitor:partitions() of
L when is_list(L) ->
ok;
Other ->
ErrorMsg = io_lib:format("rabbit_node_monitor reports unexpected partitions value: ~p",
[Other]),
{error_string, ErrorMsg}
end;
node_health_check(alarms) ->
case proplists:get_value(alarms, rabbit:status()) of
[] ->
ok;
Alarms ->
ErrorMsg = io_lib:format("resource alarm(s) in effect:~p", [Alarms]),
{error_string, ErrorMsg}
end.
health_check_queues([]) ->
ok;
health_check_queues([VHost|RestVHosts]) ->
case rabbit_amqqueue:info_local(VHost) of
L when is_list(L) ->
health_check_queues(RestVHosts);
Other ->
ErrorMsg = io_lib:format("list_queues unexpected output for vhost ~s: ~p",
[VHost, Other]),
{error_string, ErrorMsg}
end.

128
deps/rabbit_common/src/rabbit_log.erl vendored Normal file
View File

@ -0,0 +1,128 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_log).
-export([log/3, log/4]).
-export([debug/1, debug/2, debug/3,
info/1, info/2, info/3,
notice/1, notice/2, notice/3,
warning/1, warning/2, warning/3,
error/1, error/2, error/3,
critical/1, critical/2, critical/3,
alert/1, alert/2, alert/3,
emergency/1, emergency/2, emergency/3,
none/1, none/2, none/3]).
-include("rabbit_log.hrl").
%%----------------------------------------------------------------------------
-type category() :: atom().
-spec debug(string()) -> 'ok'.
-spec debug(string(), [any()]) -> 'ok'.
-spec debug(pid() | [tuple()], string(), [any()]) -> 'ok'.
-spec info(string()) -> 'ok'.
-spec info(string(), [any()]) -> 'ok'.
-spec info(pid() | [tuple()], string(), [any()]) -> 'ok'.
-spec notice(string()) -> 'ok'.
-spec notice(string(), [any()]) -> 'ok'.
-spec notice(pid() | [tuple()], string(), [any()]) -> 'ok'.
-spec warning(string()) -> 'ok'.
-spec warning(string(), [any()]) -> 'ok'.
-spec warning(pid() | [tuple()], string(), [any()]) -> 'ok'.
-spec error(string()) -> 'ok'.
-spec error(string(), [any()]) -> 'ok'.
-spec error(pid() | [tuple()], string(), [any()]) -> 'ok'.
-spec critical(string()) -> 'ok'.
-spec critical(string(), [any()]) -> 'ok'.
-spec critical(pid() | [tuple()], string(), [any()]) -> 'ok'.
-spec alert(string()) -> 'ok'.
-spec alert(string(), [any()]) -> 'ok'.
-spec alert(pid() | [tuple()], string(), [any()]) -> 'ok'.
-spec emergency(string()) -> 'ok'.
-spec emergency(string(), [any()]) -> 'ok'.
-spec emergency(pid() | [tuple()], string(), [any()]) -> 'ok'.
-spec none(string()) -> 'ok'.
-spec none(string(), [any()]) -> 'ok'.
-spec none(pid() | [tuple()], string(), [any()]) -> 'ok'.
%%----------------------------------------------------------------------------
-spec log(category(), lager:log_level(), string()) -> 'ok'.
log(Category, Level, Fmt) -> log(Category, Level, Fmt, []).
-spec log(category(), lager:log_level(), string(), [any()]) -> 'ok'.
log(Category, Level, Fmt, Args) when is_list(Args) ->
Sink = case Category of
default -> ?LAGER_SINK;
_ -> make_internal_sink_name(Category)
end,
lager:log(Sink, Level, self(), Fmt, Args).
make_internal_sink_name(rabbit_log_connection) -> rabbit_log_connection_lager_event;
make_internal_sink_name(rabbit_log_channel) -> rabbit_log_channel_lager_event;
make_internal_sink_name(rabbit_log_mirroring) -> rabbit_log_mirroring_lager_event;
make_internal_sink_name(rabbit_log_queue) -> rabbit_log_queue_lager_event;
make_internal_sink_name(rabbit_log_federation) -> rabbit_log_federation_lager_event;
make_internal_sink_name(rabbit_log_upgrade) -> rabbit_log_upgrade_lager_event;
make_internal_sink_name(Category) ->
lager_util:make_internal_sink_name(Category).
debug(Format) -> debug(Format, []).
debug(Format, Args) -> debug(self(), Format, Args).
debug(Metadata, Format, Args) ->
lager:log(?LAGER_SINK, debug, Metadata, Format, Args).
info(Format) -> info(Format, []).
info(Format, Args) -> info(self(), Format, Args).
info(Metadata, Format, Args) ->
lager:log(?LAGER_SINK, info, Metadata, Format, Args).
notice(Format) -> notice(Format, []).
notice(Format, Args) -> notice(self(), Format, Args).
notice(Metadata, Format, Args) ->
lager:log(?LAGER_SINK, notice, Metadata, Format, Args).
warning(Format) -> warning(Format, []).
warning(Format, Args) -> warning(self(), Format, Args).
warning(Metadata, Format, Args) ->
lager:log(?LAGER_SINK, warning, Metadata, Format, Args).
error(Format) -> ?MODULE:error(Format, []).
error(Format, Args) -> ?MODULE:error(self(), Format, Args).
error(Metadata, Format, Args) ->
lager:log(?LAGER_SINK, error, Metadata, Format, Args).
critical(Format) -> critical(Format, []).
critical(Format, Args) -> critical(self(), Format, Args).
critical(Metadata, Format, Args) ->
lager:log(?LAGER_SINK, critical, Metadata, Format, Args).
alert(Format) -> alert(Format, []).
alert(Format, Args) -> alert(self(), Format, Args).
alert(Metadata, Format, Args) ->
lager:log(?LAGER_SINK, alert, Metadata, Format, Args).
emergency(Format) -> emergency(Format, []).
emergency(Format, Args) -> emergency(self(), Format, Args).
emergency(Metadata, Format, Args) ->
lager:log(?LAGER_SINK, emergency, Metadata, Format, Args).
none(Format) -> none(Format, []).
none(Format, Args) -> none(self(), Format, Args).
none(Metadata, Format, Args) ->
lager:log(?LAGER_SINK, none, Metadata, Format, Args).

View File

@ -37,7 +37,7 @@
-export([confirm_to_sender/2]).
-export([throw_on_error/2, with_exit_handler/2, is_abnormal_exit/1,
filter_exit_map/2]).
-export([with_user/2, with_user_and_vhost/3]).
-export([with_user/2]).
-export([execute_mnesia_transaction/1]).
-export([execute_mnesia_transaction/2]).
-export([execute_mnesia_tx_with_tail/1]).
@ -66,7 +66,7 @@
-export([os_cmd/1]).
-export([is_os_process_alive/1]).
-export([gb_sets_difference/2]).
-export([version/0, otp_release/0, which_applications/0]).
-export([version/0, otp_release/0, platform_and_version/0, which_applications/0]).
-export([sequence_error/1]).
-export([check_expiry/1]).
-export([base64url/1]).
@ -173,14 +173,12 @@
-spec is_abnormal_exit(any()) -> boolean().
-spec filter_exit_map(fun ((A) -> B), [A]) -> [B].
-spec with_user(rabbit_types:username(), thunk(A)) -> A.
-spec with_user_and_vhost
(rabbit_types:username(), rabbit_types:vhost(), thunk(A)) -> A.
-spec execute_mnesia_transaction(thunk(A)) -> A.
-spec execute_mnesia_transaction(thunk(A), fun ((A, boolean()) -> B)) -> B.
-spec execute_mnesia_tx_with_tail
(thunk(fun ((boolean()) -> B))) -> B | (fun ((boolean()) -> B)).
-spec ensure_ok(ok_or_error(), atom()) -> 'ok'.
-spec tcp_name(atom(), inet:ip_address(), rabbit_networking:ip_port()) ->
-spec tcp_name(atom(), inet:ip_address(), rabbit_net:ip_port()) ->
atom().
-spec format_inet_error(atom()) -> string().
-spec upmap(fun ((A) -> B), [A]) -> [B].
@ -240,6 +238,7 @@
-spec gb_sets_difference(gb_sets:set(), gb_sets:set()) -> gb_sets:set().
-spec version() -> string().
-spec otp_release() -> string().
-spec platform_and_version() -> string().
-spec which_applications() -> [{atom(), string(), string()}].
-spec sequence_error([({'error', any()} | any())]) ->
{'error', any()} | any().
@ -435,7 +434,7 @@ enable_cover(Dirs) ->
end, ok, Dirs).
start_cover(NodesS) ->
{ok, _} = cover:start([rabbit_nodes:make(N) || N <- NodesS]),
{ok, _} = cover:start([rabbit_nodes_common:make(N) || N <- NodesS]),
ok.
report_cover() -> report_cover(["."]).
@ -526,9 +525,6 @@ with_user(Username, Thunk) ->
end
end.
with_user_and_vhost(Username, VHostPath, Thunk) ->
with_user(Username, rabbit_vhost:with(VHostPath, Thunk)).
execute_mnesia_transaction(TxFun) ->
%% Making this a sync_transaction allows us to use dirty_read
%% elsewhere and get a consistent result even when that read
@ -1028,6 +1024,9 @@ otp_release() ->
erlang:system_info(otp_release)
end.
platform_and_version() ->
string:join(["Erlang/OTP", otp_release()], " ").
%% application:which_applications(infinity) is dangerous, since it can
%% cause deadlocks on shutdown. So we have to use a timeout variant,
%% but w/o creating spurious timeout errors. The timeout value is twice

View File

@ -17,17 +17,18 @@
-module(rabbit_net).
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
-include_lib("ssl/src/ssl_api.hrl").
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, sync_recv/2, async_recv/3, port_command/2, getopts/2,
setopts/2, send/2, close/1, fast_close/1, sockname/1, peername/1,
peercert/1, connection_string/2, socket_ends/2, is_loopback/1,
accept_ack/2, unwrap_socket/1, maybe_get_proxy_socket/1]).
tcp_host/1, unwrap_socket/1, maybe_get_proxy_socket/1]).
%%---------------------------------------------------------------------------
-export_type([socket/0]).
-export_type([socket/0, ip_port/0, hostname/0]).
-type stat_option() ::
'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' |
@ -37,6 +38,8 @@
-type socket() :: port() | ssl:sslsocket().
-type opts() :: [{atom(), any()} |
{raw, non_neg_integer(), non_neg_integer(), binary()}].
-type hostname() :: inet:hostname().
-type ip_port() :: inet:port_number().
-type host_or_ip() :: binary() | inet:ip_address().
-spec is_ssl(socket()) -> boolean().
-spec ssl_info(socket()) -> 'nossl' | ok_val_or_error([{atom(), any()}]).
@ -65,18 +68,17 @@
-spec close(socket()) -> ok_or_any_error().
-spec fast_close(socket()) -> ok_or_any_error().
-spec sockname(socket()) ->
ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()}).
ok_val_or_error({inet:ip_address(), ip_port()}).
-spec peername(socket()) ->
ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()}).
ok_val_or_error({inet:ip_address(), ip_port()}).
-spec peercert(socket()) ->
'nossl' | ok_val_or_error(rabbit_ssl:certificate()).
-spec connection_string(socket(), 'inbound' | 'outbound') ->
ok_val_or_error(string()).
-spec socket_ends(socket(), 'inbound' | 'outbound') ->
ok_val_or_error({host_or_ip(), rabbit_networking:ip_port(),
host_or_ip(), rabbit_networking:ip_port()}).
ok_val_or_error({host_or_ip(), ip_port(),
host_or_ip(), ip_port()}).
-spec is_loopback(socket() | inet:ip_address()) -> boolean().
-spec accept_ack(any(), socket()) -> ok.
-spec unwrap_socket(socket() | ranch_proxy:proxy_socket() | ranch_proxy_ssl:ssl_socket()) -> socket().
%%---------------------------------------------------------------------------
@ -245,9 +247,28 @@ socket_ends(Sock, Direction = inbound) when is_tuple(Sock) ->
maybe_ntoab(Addr) when is_tuple(Addr) -> rabbit_misc:ntoab(Addr);
maybe_ntoab(Host) -> Host.
tcp_host({0,0,0,0}) ->
hostname();
tcp_host({0,0,0,0,0,0,0,0}) ->
hostname();
tcp_host(IPAddress) ->
case inet:gethostbyaddr(IPAddress) of
{ok, #hostent{h_name = Name}} -> Name;
{error, _Reason} -> rabbit_misc:ntoa(IPAddress)
end.
hostname() ->
{ok, Hostname} = inet:gethostname(),
case inet:gethostbyname(Hostname) of
{ok, #hostent{h_name = Name}} -> Name;
{error, _Reason} -> Hostname
end.
rdns(Addr) ->
case application:get_env(rabbit, reverse_dns_lookups) of
{ok, true} -> list_to_binary(rabbit_networking:tcp_host(Addr));
{ok, true} -> list_to_binary(tcp_host(Addr));
_ -> Addr
end.
@ -268,22 +289,6 @@ is_loopback(_) -> false.
ipv4(AB, CD) -> {AB bsr 8, AB band 255, CD bsr 8, CD band 255}.
accept_ack(Ref, Sock) ->
ok = ranch:accept_ack(Ref),
case tune_buffer_size(Sock) of
ok -> ok;
{error, _} -> rabbit_net:fast_close(Sock),
exit(normal)
end,
ok = file_handle_cache:obtain().
tune_buffer_size(Sock) ->
case getopts(Sock, [sndbuf, recbuf, buffer]) of
{ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
setopts(Sock, [{buffer, BufSz}]);
Error -> Error
end.
unwrap_socket(Sock) when ?IS_SSL(Sock);
is_port(Sock) ->
Sock;
@ -314,4 +319,4 @@ maybe_get_proxy_socket(Sock) when is_tuple(Sock) ->
undefined
end;
maybe_get_proxy_socket(_Sock) ->
undefined.
undefined.

View File

@ -1,480 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_networking).
%% This module contains various functions that deal with networking,
%% TCP and TLS listeners, and connection information.
%%
%% It also contains a boot step boot/0 that starts networking machinery.
%% This module primarily covers AMQP 0-9-1 but some bits are reused in
%% plugins that provide protocol support, e.g. STOMP or MQTT.
%%
%% Functions in this module take care of normalising TCP listener options,
%% including dual IP stack cases, and starting the AMQP 0-9-1 listener(s).
%%
%% See also tcp_listener_sup and tcp_listener.
-export([boot/0, start_tcp_listener/2, start_ssl_listener/3,
stop_tcp_listener/1, on_node_down/1, active_listeners/0,
node_listeners/1, register_connection/1, unregister_connection/1,
connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
emit_connection_info_all/4, emit_connection_info_local/3,
close_connection/2, force_connection_event_refresh/1, tcp_host/1]).
%% Used by TCP-based transports, e.g. STOMP adapter
-export([tcp_listener_addresses/1, tcp_listener_spec/9,
ensure_ssl/0, fix_ssl_options/1, poodle_check/1]).
-export([tcp_listener_started/4, tcp_listener_stopped/4]).
%% Internal
-export([connections_local/0]).
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
%% IANA-suggested ephemeral port range is 49152 to 65535
-define(FIRST_TEST_BIND_PORT, 49152).
%%----------------------------------------------------------------------------
-export_type([ip_port/0, hostname/0]).
-type hostname() :: inet:hostname().
-type ip_port() :: inet:port_number().
-type family() :: atom().
-type listener_config() :: ip_port() |
{hostname(), ip_port()} |
{hostname(), ip_port(), family()}.
-type address() :: {inet:ip_address(), ip_port(), family()}.
-type name_prefix() :: atom().
-type protocol() :: atom().
-type label() :: string().
-spec start_tcp_listener(listener_config(), integer()) -> 'ok'.
-spec start_ssl_listener
(listener_config(), rabbit_types:infos(), integer()) -> 'ok'.
-spec stop_tcp_listener(listener_config()) -> 'ok'.
-spec active_listeners() -> [rabbit_types:listener()].
-spec node_listeners(node()) -> [rabbit_types:listener()].
-spec register_connection(pid()) -> ok.
-spec unregister_connection(pid()) -> ok.
-spec connections() -> [rabbit_types:connection()].
-spec connections_local() -> [rabbit_types:connection()].
-spec connection_info_keys() -> rabbit_types:info_keys().
-spec connection_info(rabbit_types:connection()) -> rabbit_types:infos().
-spec connection_info(rabbit_types:connection(), rabbit_types:info_keys()) ->
rabbit_types:infos().
-spec connection_info_all() -> [rabbit_types:infos()].
-spec connection_info_all(rabbit_types:info_keys()) ->
[rabbit_types:infos()].
-spec close_connection(pid(), string()) -> 'ok'.
-spec force_connection_event_refresh(reference()) -> 'ok'.
-spec on_node_down(node()) -> 'ok'.
-spec tcp_listener_addresses(listener_config()) -> [address()].
-spec tcp_listener_spec
(name_prefix(), address(), [gen_tcp:listen_option()], module(), module(),
protocol(), any(), non_neg_integer(), label()) ->
supervisor:child_spec().
-spec ensure_ssl() -> rabbit_types:infos().
-spec poodle_check(atom()) -> 'ok' | 'danger'.
-spec boot() -> 'ok'.
-spec tcp_listener_started
(_, _,
string() |
{byte(),byte(),byte(),byte()} |
{char(),char(),char(),char(),char(),char(),char(),char()}, _) ->
'ok'.
-spec tcp_listener_stopped
(_, _,
string() |
{byte(),byte(),byte(),byte()} |
{char(),char(),char(),char(),char(),char(),char(),char()},
_) ->
'ok'.
%%----------------------------------------------------------------------------
boot() ->
ok = record_distribution_listener(),
_ = application:start(ranch),
ok = boot_tcp(application:get_env(rabbit, num_tcp_acceptors, 10)),
ok = boot_ssl(application:get_env(rabbit, num_ssl_acceptors, 1)),
_ = maybe_start_proxy_protocol(),
ok.
boot_tcp(NumAcceptors) ->
{ok, TcpListeners} = application:get_env(tcp_listeners),
[ok = start_tcp_listener(Listener, NumAcceptors) || Listener <- TcpListeners],
ok.
boot_ssl(NumAcceptors) ->
case application:get_env(ssl_listeners) of
{ok, []} ->
ok;
{ok, SslListeners} ->
SslOpts = ensure_ssl(),
case poodle_check('AMQP') of
ok -> [start_ssl_listener(L, SslOpts, NumAcceptors) || L <- SslListeners];
danger -> ok
end,
ok
end.
ensure_ssl() ->
{ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps),
ok = app_utils:start_applications(SslAppsConfig),
{ok, SslOptsConfig} = application:get_env(rabbit, ssl_options),
rabbit_ssl_options:fix(SslOptsConfig).
poodle_check(Context) ->
{ok, Vsn} = application:get_key(ssl, vsn),
case rabbit_misc:version_compare(Vsn, "5.3", gte) of %% R16B01
true -> ok;
false -> case application:get_env(rabbit, ssl_allow_poodle_attack) of
{ok, true} -> ok;
_ -> log_poodle_fail(Context),
danger
end
end.
log_poodle_fail(Context) ->
rabbit_log:error(
"The installed version of Erlang (~s) contains the bug OTP-10905,~n"
"which makes it impossible to disable SSLv3. This makes the system~n"
"vulnerable to the POODLE attack. SSL listeners for ~s have therefore~n"
"been disabled.~n~n"
"You are advised to upgrade to a recent Erlang version; R16B01 is the~n"
"first version in which this bug is fixed, but later is usually~n"
"better.~n~n"
"If you cannot upgrade now and want to re-enable SSL listeners, you can~n"
"set the config item 'ssl_allow_poodle_attack' to 'true' in the~n"
"'rabbit' section of your configuration file.~n",
[rabbit_misc:otp_release(), Context]).
maybe_start_proxy_protocol() ->
case application:get_env(rabbit, proxy_protocol, false) of
false -> ok;
true -> application:start(ranch_proxy_protocol)
end.
fix_ssl_options(Config) ->
rabbit_ssl_options:fix(Config).
tcp_listener_addresses(Port) when is_integer(Port) ->
tcp_listener_addresses_auto(Port);
tcp_listener_addresses({"auto", Port}) ->
%% Variant to prevent lots of hacking around in bash and batch files
tcp_listener_addresses_auto(Port);
tcp_listener_addresses({Host, Port}) ->
%% auto: determine family IPv4 / IPv6 after converting to IP address
tcp_listener_addresses({Host, Port, auto});
tcp_listener_addresses({Host, Port, Family0})
when is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) ->
[{IPAddress, Port, Family} ||
{IPAddress, Family} <- getaddr(Host, Family0)];
tcp_listener_addresses({_Host, Port, _Family0}) ->
rabbit_log:error("invalid port ~p - not 0..65535~n", [Port]),
throw({error, {invalid_port, Port}}).
tcp_listener_addresses_auto(Port) ->
lists:append([tcp_listener_addresses(Listener) ||
Listener <- port_to_listeners(Port)]).
tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts,
Transport, ProtoSup, ProtoOpts, Protocol, NumAcceptors, Label) ->
{rabbit_misc:tcp_name(NamePrefix, IPAddress, Port),
{tcp_listener_sup, start_link,
[IPAddress, Port, Transport, [Family | SocketOpts], ProtoSup, ProtoOpts,
{?MODULE, tcp_listener_started, [Protocol, SocketOpts]},
{?MODULE, tcp_listener_stopped, [Protocol, SocketOpts]},
NumAcceptors, Label]},
transient, infinity, supervisor, [tcp_listener_sup]}.
start_tcp_listener(Listener, NumAcceptors) ->
start_listener(Listener, NumAcceptors, amqp, "TCP Listener", tcp_opts()).
start_ssl_listener(Listener, SslOpts, NumAcceptors) ->
start_listener(Listener, NumAcceptors, 'amqp/ssl', "SSL Listener", tcp_opts() ++ SslOpts).
start_listener(Listener, NumAcceptors, Protocol, Label, Opts) ->
[start_listener0(Address, NumAcceptors, Protocol, Label, Opts) ||
Address <- tcp_listener_addresses(Listener)],
ok.
start_listener0(Address, NumAcceptors, Protocol, Label, Opts) ->
Transport = transport(Protocol),
Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, Opts,
Transport, rabbit_connection_sup, [], Protocol,
NumAcceptors, Label),
case supervisor:start_child(rabbit_sup, Spec) of
{ok, _} -> ok;
{error, {shutdown, _}} -> {IPAddress, Port, _Family} = Address,
exit({could_not_start_tcp_listener,
{rabbit_misc:ntoa(IPAddress), Port}})
end.
transport(Protocol) ->
ProxyProtocol = application:get_env(rabbit, proxy_protocol, false),
case {Protocol, ProxyProtocol} of
{amqp, false} -> ranch_tcp;
{amqp, true} -> ranch_proxy;
{'amqp/ssl', false} -> ranch_ssl;
{'amqp/ssl', true} -> ranch_proxy_ssl
end.
stop_tcp_listener(Listener) ->
[stop_tcp_listener0(Address) ||
Address <- tcp_listener_addresses(Listener)],
ok.
stop_tcp_listener0({IPAddress, Port, _Family}) ->
Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port),
ok = supervisor:terminate_child(rabbit_sup, Name),
ok = supervisor:delete_child(rabbit_sup, Name).
tcp_listener_started(Protocol, Opts, IPAddress, Port) ->
%% We need the ip to distinguish e.g. 0.0.0.0 and 127.0.0.1
%% We need the host so we can distinguish multiple instances of the above
%% in a cluster.
ok = mnesia:dirty_write(
rabbit_listener,
#listener{node = node(),
protocol = Protocol,
host = tcp_host(IPAddress),
ip_address = IPAddress,
port = Port,
opts = Opts}).
tcp_listener_stopped(Protocol, Opts, IPAddress, Port) ->
ok = mnesia:dirty_delete_object(
rabbit_listener,
#listener{node = node(),
protocol = Protocol,
host = tcp_host(IPAddress),
ip_address = IPAddress,
port = Port,
opts = Opts}).
record_distribution_listener() ->
{Name, Host} = rabbit_nodes:parts(node()),
{port, Port, _Version} = erl_epmd:port_please(Name, Host),
tcp_listener_started(clustering, [], {0,0,0,0,0,0,0,0}, Port).
active_listeners() ->
rabbit_misc:dirty_read_all(rabbit_listener).
node_listeners(Node) ->
mnesia:dirty_read(rabbit_listener, Node).
on_node_down(Node) ->
case lists:member(Node, nodes()) of
false ->
rabbit_log:info(
"Node ~s is down, deleting its listeners~n", [Node]),
ok = mnesia:dirty_delete(rabbit_listener, Node);
true ->
rabbit_log:info(
"Keeping ~s listeners: the node is already back~n", [Node])
end.
register_connection(Pid) -> pg_local:join(rabbit_connections, Pid).
unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid).
connections() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_networking, connections_local, []).
connections_local() -> pg_local:get_members(rabbit_connections).
connection_info_keys() -> rabbit_reader:info_keys().
connection_info(Pid) -> rabbit_reader:info(Pid).
connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items).
connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end).
connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end).
emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
Pids = [ spawn_link(Node, rabbit_networking, emit_connection_info_local, [Items, Ref, AggregatorPid]) || Node <- Nodes ],
rabbit_control_misc:await_emitters_termination(Pids),
ok.
emit_connection_info_local(Items, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map_with_exit_handler(
AggregatorPid, Ref, fun(Q) -> connection_info(Q, Items) end,
connections_local()).
close_connection(Pid, Explanation) ->
rabbit_log:info("Closing connection ~p because ~p~n", [Pid, Explanation]),
case lists:member(Pid, connections()) of
true -> rabbit_reader:shutdown(Pid, Explanation);
false -> throw({error, {not_a_connection_pid, Pid}})
end.
force_connection_event_refresh(Ref) ->
[rabbit_reader:force_event_refresh(C, Ref) || C <- connections()],
ok.
%%--------------------------------------------------------------------
tcp_host({0,0,0,0}) ->
hostname();
tcp_host({0,0,0,0,0,0,0,0}) ->
hostname();
tcp_host(IPAddress) ->
case inet:gethostbyaddr(IPAddress) of
{ok, #hostent{h_name = Name}} -> Name;
{error, _Reason} -> rabbit_misc:ntoa(IPAddress)
end.
hostname() ->
{ok, Hostname} = inet:gethostname(),
case inet:gethostbyname(Hostname) of
{ok, #hostent{h_name = Name}} -> Name;
{error, _Reason} -> Hostname
end.
cmap(F) -> rabbit_misc:filter_exit_map(F, connections()).
tcp_opts() ->
{ok, ConfigOpts} = application:get_env(rabbit, tcp_listen_options),
ConfigOpts.
%% inet_parse:address takes care of ip string, like "0.0.0.0"
%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
%% and runs 'inet_gethost' port process for dns lookups.
%% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
getaddr(Host, Family) ->
case inet_parse:address(Host) of
{ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}];
{error, _} -> gethostaddr(Host, Family)
end.
gethostaddr(Host, auto) ->
Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]],
case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of
[] -> host_lookup_error(Host, Lookups);
IPs -> IPs
end;
gethostaddr(Host, Family) ->
case inet:getaddr(Host, Family) of
{ok, IPAddress} -> [{IPAddress, Family}];
{error, Reason} -> host_lookup_error(Host, Reason)
end.
host_lookup_error(Host, Reason) ->
rabbit_log:error("invalid host ~p - ~p~n", [Host, Reason]),
throw({error, {invalid_host, Host, Reason}}).
resolve_family({_,_,_,_}, auto) -> inet;
resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6;
resolve_family(IP, auto) -> throw({error, {strange_family, IP}});
resolve_family(_, F) -> F.
%%--------------------------------------------------------------------
%% There are three kinds of machine (for our purposes).
%%
%% * Those which treat IPv4 addresses as a special kind of IPv6 address
%% ("Single stack")
%% - Linux by default, Windows Vista and later
%% - We also treat any (hypothetical?) IPv6-only machine the same way
%% * Those which consider IPv6 and IPv4 to be completely separate things
%% ("Dual stack")
%% - OpenBSD, Windows XP / 2003, Linux if so configured
%% * Those which do not support IPv6.
%% - Ancient/weird OSes, Linux if so configured
%%
%% How to reconfigure Linux to test this:
%% Single stack (default):
%% echo 0 > /proc/sys/net/ipv6/bindv6only
%% Dual stack:
%% echo 1 > /proc/sys/net/ipv6/bindv6only
%% IPv4 only:
%% add ipv6.disable=1 to GRUB_CMDLINE_LINUX_DEFAULT in /etc/default/grub then
%% sudo update-grub && sudo reboot
%%
%% This matters in (and only in) the case where the sysadmin (or the
%% app descriptor) has only supplied a port and we wish to bind to
%% "all addresses". This means different things depending on whether
%% we're single or dual stack. On single stack binding to "::"
%% implicitly includes all IPv4 addresses, and subsequently attempting
%% to bind to "0.0.0.0" will fail. On dual stack, binding to "::" will
%% only bind to IPv6 addresses, and we need another listener bound to
%% "0.0.0.0" for IPv4. Finally, on IPv4-only systems we of course only
%% want to bind to "0.0.0.0".
%%
%% Unfortunately it seems there is no way to detect single vs dual stack
%% apart from attempting to bind to the port.
port_to_listeners(Port) ->
IPv4 = {"0.0.0.0", Port, inet},
IPv6 = {"::", Port, inet6},
case ipv6_status(?FIRST_TEST_BIND_PORT) of
single_stack -> [IPv6];
ipv6_only -> [IPv6];
dual_stack -> [IPv6, IPv4];
ipv4_only -> [IPv4]
end.
ipv6_status(TestPort) ->
IPv4 = [inet, {ip, {0,0,0,0}}],
IPv6 = [inet6, {ip, {0,0,0,0,0,0,0,0}}],
case gen_tcp:listen(TestPort, IPv6) of
{ok, LSock6} ->
case gen_tcp:listen(TestPort, IPv4) of
{ok, LSock4} ->
%% Dual stack
gen_tcp:close(LSock6),
gen_tcp:close(LSock4),
dual_stack;
%% Checking the error here would only let us
%% distinguish single stack IPv6 / IPv4 vs IPv6 only,
%% which we figure out below anyway.
{error, _} ->
gen_tcp:close(LSock6),
case gen_tcp:listen(TestPort, IPv4) of
%% Single stack
{ok, LSock4} -> gen_tcp:close(LSock4),
single_stack;
%% IPv6-only machine. Welcome to the future.
{error, eafnosupport} -> ipv6_only; %% Linux
{error, eprotonosupport}-> ipv6_only; %% FreeBSD
%% Dual stack machine with something already
%% on IPv4.
{error, _} -> ipv6_status(TestPort + 1)
end
end;
%% IPv4-only machine. Welcome to the 90s.
{error, eafnosupport} -> %% Linux
ipv4_only;
{error, eprotonosupport} -> %% FreeBSD
ipv4_only;
%% Port in use
{error, _} ->
ipv6_status(TestPort + 1)
end.

View File

@ -1,239 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_nodes).
-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0,
is_running/2, is_process_running/2,
cluster_name/0, set_cluster_name/2, ensure_epmd/0,
all_running/0]).
-include_lib("kernel/include/inet.hrl").
-define(EPMD_TIMEOUT, 30000).
-define(TCP_DIAGNOSTIC_TIMEOUT, 5000).
-define(ERROR_LOGGER_HANDLER, rabbit_error_logger_handler).
%%----------------------------------------------------------------------------
%% Specs
%%----------------------------------------------------------------------------
-spec names(string()) ->
rabbit_types:ok_or_error2([{string(), integer()}], term()).
-spec diagnostics([node()]) -> string().
-spec make({string(), string()} | string()) -> node().
-spec parts(node() | string()) -> {string(), string()}.
-spec cookie_hash() -> string().
-spec is_running(node(), atom()) -> boolean().
-spec is_process_running(node(), atom()) -> boolean().
-spec cluster_name() -> binary().
-spec set_cluster_name(binary(), rabbit_types:username()) -> 'ok'.
-spec ensure_epmd() -> 'ok'.
-spec all_running() -> [node()].
%%----------------------------------------------------------------------------
names(Hostname) ->
Self = self(),
Ref = make_ref(),
{Pid, MRef} = spawn_monitor(
fun () -> Self ! {Ref, net_adm:names(Hostname)} end),
timer:exit_after(?EPMD_TIMEOUT, Pid, timeout),
receive
{Ref, Names} -> erlang:demonitor(MRef, [flush]),
Names;
{'DOWN', MRef, process, Pid, Reason} -> {error, Reason}
end.
diagnostics(Nodes) ->
verbose_erlang_distribution(true),
NodeDiags = [{"~nDIAGNOSTICS~n===========~n~n"
"attempted to contact: ~p~n", [Nodes]}] ++
[diagnostics_node(Node) || Node <- Nodes] ++
current_node_details(),
verbose_erlang_distribution(false),
rabbit_misc:format_many(lists:flatten(NodeDiags)).
verbose_erlang_distribution(true) ->
net_kernel:verbose(1),
error_logger:add_report_handler(?ERROR_LOGGER_HANDLER);
verbose_erlang_distribution(false) ->
net_kernel:verbose(0),
error_logger:delete_report_handler(?ERROR_LOGGER_HANDLER).
current_node_details() ->
[{"~ncurrent node details:~n- node name: ~w", [node()]},
case init:get_argument(home) of
{ok, [[Home]]} -> {"- home dir: ~s", [Home]};
Other -> {"- no home dir: ~p", [Other]}
end,
{"- cookie hash: ~s", [cookie_hash()]}].
diagnostics_node(Node) ->
{Name, Host} = parts(Node),
[{"~s:", [Node]} |
case names(Host) of
{error, Reason} ->
[{" * unable to connect to epmd (port ~s) on ~s: ~s~n",
[epmd_port(), Host, rabbit_misc:format_inet_error(Reason)]}];
{ok, NamePorts} ->
[{" * connected to epmd (port ~s) on ~s",
[epmd_port(), Host]}] ++
case net_adm:ping(Node) of
pong -> dist_working_diagnostics(Node);
pang -> dist_broken_diagnostics(Name, Host, NamePorts)
end
end].
epmd_port() ->
case init:get_argument(epmd_port) of
{ok, [[Port | _] | _]} when is_list(Port) -> Port;
error -> "4369"
end.
dist_working_diagnostics(Node) ->
case rabbit:is_running(Node) of
true -> [{" * node ~s up, 'rabbit' application running", [Node]}];
false -> [{" * node ~s up, 'rabbit' application not running~n"
" * running applications on ~s: ~p~n"
" * suggestion: start_app on ~s",
[Node, Node, remote_apps(Node), Node]}]
end.
remote_apps(Node) ->
%% We want a timeout here because really, we don't trust the node,
%% the last thing we want to do is hang.
case rpc:call(Node, application, which_applications, [5000]) of
{badrpc, _} = E -> E;
Apps -> [App || {App, _, _} <- Apps]
end.
dist_broken_diagnostics(Name, Host, NamePorts) ->
case [{N, P} || {N, P} <- NamePorts, N =:= Name] of
[] ->
{SelfName, SelfHost} = parts(node()),
Others = [list_to_atom(N) || {N, _} <- NamePorts,
N =/= case SelfHost of
Host -> SelfName;
_ -> never_matches
end],
OthersDiag = case Others of
[] -> [{" no other nodes on ~s",
[Host]}];
_ -> [{" other nodes on ~s: ~p",
[Host, Others]}]
end,
[{" * epmd reports: node '~s' not running at all", [Name]},
OthersDiag, {" * suggestion: start the node", []}];
[{Name, Port}] ->
[{" * epmd reports node '~s' running on port ~b", [Name, Port]} |
case diagnose_connect(Host, Port) of
ok ->
connection_succeeded_diagnostics();
{error, Reason} ->
[{" * can't establish TCP connection, reason: ~s~n"
" * suggestion: blocked by firewall?",
[rabbit_misc:format_inet_error(Reason)]}]
end]
end.
connection_succeeded_diagnostics() ->
case gen_event:call(error_logger, ?ERROR_LOGGER_HANDLER, get_connection_report) of
[] ->
[{" * TCP connection succeeded but Erlang distribution "
"failed~n"
" * suggestion: hostname mismatch?~n"
" * suggestion: is the cookie set correctly?~n"
" * suggestion: is the Erlang distribution using TLS?", []}];
Report ->
[{" * TCP connection succeeded but Erlang distribution "
"failed~n", []}]
++ Report
end.
diagnose_connect(Host, Port) ->
case inet:gethostbyname(Host) of
{ok, #hostent{h_addrtype = Family}} ->
case gen_tcp:connect(Host, Port, [Family],
?TCP_DIAGNOSTIC_TIMEOUT) of
{ok, Socket} -> gen_tcp:close(Socket),
ok;
{error, _} = E -> E
end;
{error, _} = E ->
E
end.
make({Prefix, Suffix}) -> list_to_atom(lists:append([Prefix, "@", Suffix]));
make(NodeStr) -> make(parts(NodeStr)).
parts(Node) when is_atom(Node) ->
parts(atom_to_list(Node));
parts(NodeStr) ->
case lists:splitwith(fun (E) -> E =/= $@ end, NodeStr) of
{Prefix, []} -> {_, Suffix} = parts(node()),
{Prefix, Suffix};
{Prefix, Suffix} -> {Prefix, tl(Suffix)}
end.
cookie_hash() ->
base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))).
is_running(Node, Application) ->
case rpc:call(Node, rabbit_misc, which_applications, []) of
{badrpc, _} -> false;
Apps -> proplists:is_defined(Application, Apps)
end.
is_process_running(Node, Process) ->
case rpc:call(Node, erlang, whereis, [Process]) of
{badrpc, _} -> false;
undefined -> false;
P when is_pid(P) -> true
end.
cluster_name() ->
rabbit_runtime_parameters:value_global(
cluster_name, cluster_name_default()).
cluster_name_default() ->
{ID, _} = rabbit_nodes:parts(node()),
{ok, Host} = inet:gethostname(),
{ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host),
list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))).
set_cluster_name(Name, Username) ->
%% Cluster name should be binary
BinaryName = rabbit_data_coercion:to_binary(Name),
rabbit_runtime_parameters:set_global(cluster_name, BinaryName, Username).
ensure_epmd() ->
{ok, Prog} = init:get_argument(progname),
ID = rabbit_misc:random(1000000000),
Port = open_port(
{spawn_executable, os:find_executable(Prog)},
[{args, ["-sname", rabbit_misc:format("epmd-starter-~b", [ID]),
"-noshell", "-eval", "halt()."]},
exit_status, stderr_to_stdout, use_stdio]),
port_shutdown_loop(Port).
port_shutdown_loop(Port) ->
receive
{Port, {exit_status, _Rc}} -> ok;
{Port, _} -> port_shutdown_loop(Port)
end.
all_running() -> rabbit_mnesia:cluster_nodes(running).

View File

@ -0,0 +1,51 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_nodes_common).
-export([make/1, parts/1, ensure_epmd/0]).
-spec make({string(), string()} | string()) -> node().
-spec parts(node() | string()) -> {string(), string()}.
-spec ensure_epmd() -> 'ok'.
make({Prefix, Suffix}) -> list_to_atom(lists:append([Prefix, "@", Suffix]));
make(NodeStr) -> make(parts(NodeStr)).
parts(Node) when is_atom(Node) ->
parts(atom_to_list(Node));
parts(NodeStr) ->
case lists:splitwith(fun (E) -> E =/= $@ end, NodeStr) of
{Prefix, []} -> {_, Suffix} = parts(node()),
{Prefix, Suffix};
{Prefix, Suffix} -> {Prefix, tl(Suffix)}
end.
ensure_epmd() ->
{ok, Prog} = init:get_argument(progname),
ID = rabbit_misc:random(1000000000),
Port = open_port(
{spawn_executable, os:find_executable(Prog)},
[{args, ["-sname", rabbit_misc:format("epmd-starter-~b", [ID]),
"-noshell", "-eval", "halt()."]},
exit_status, stderr_to_stdout, use_stdio]),
port_shutdown_loop(Port).
port_shutdown_loop(Port) ->
receive
{Port, {exit_status, _Rc}} -> ok;
{Port, _} -> port_shutdown_loop(Port)
end.

View File

@ -1,3 +1,6 @@
%% This module is based on the autocluster_backend module
%% from rabbitmq-autocluster by Gavin Roy.
%%
%% Copyright (c) 2014-2015 AWeber Communications
%% All rights reserved.
%%
@ -9,7 +12,7 @@
%% * Redistributions in binary form must reproduce the above copyright notice,
%% this list of conditions and the following disclaimer in the documentation
%% and/or other materials provided with the distribution.
%% * Neither the name of the rabbitmq-autocluster-consul plugin nor the names of its
%% * Neither the name of the project nor the names of its
%% contributors may be used to endorse or promote products derived from this
%% software without specific prior written permission.
%%
@ -45,3 +48,7 @@
-callback unregister() -> ok | {error, Reason :: string()}.
-callback post_registration() -> ok | {error, Reason :: string()}.
-callback lock(Node :: atom()) -> {ok, Data :: term()} | not_supported | {error, Reason :: string()}.
-callback unlock(Data :: term()) -> ok | {error, Reason :: string()}.

View File

@ -1,90 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_queue_collector).
%% Queue collector keeps track of exclusive queues and cleans them
%% up e.g. when their connection is closed.
-behaviour(gen_server).
-export([start_link/1, register/2, delete_all/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {monitors, delete_from}).
-include("rabbit.hrl").
%%----------------------------------------------------------------------------
-spec start_link(rabbit_types:proc_name()) -> rabbit_types:ok_pid_or_error().
-spec register(pid(), pid()) -> 'ok'.
-spec delete_all(pid()) -> 'ok'.
%%----------------------------------------------------------------------------
start_link(ProcName) ->
gen_server:start_link(?MODULE, [ProcName], []).
register(CollectorPid, Q) ->
gen_server:call(CollectorPid, {register, Q}, infinity).
delete_all(CollectorPid) ->
gen_server:call(CollectorPid, delete_all, infinity).
%%----------------------------------------------------------------------------
init([ProcName]) ->
?store_proc_name(ProcName),
{ok, #state{monitors = pmon:new(), delete_from = undefined}}.
%%--------------------------------------------------------------------------
handle_call({register, QPid}, _From,
State = #state{monitors = QMons, delete_from = Deleting}) ->
case Deleting of
undefined -> ok;
_ -> ok = rabbit_amqqueue:delete_exclusive([QPid], Deleting)
end,
{reply, ok, State#state{monitors = pmon:monitor(QPid, QMons)}};
handle_call(delete_all, From, State = #state{monitors = QMons,
delete_from = undefined}) ->
case pmon:monitored(QMons) of
[] -> {reply, ok, State#state{delete_from = From}};
QPids -> ok = rabbit_amqqueue:delete_exclusive(QPids, From),
{noreply, State#state{delete_from = From}}
end.
handle_cast(Msg, State) ->
{stop, {unhandled_cast, Msg}, State}.
handle_info({'DOWN', _MRef, process, DownPid, _Reason},
State = #state{monitors = QMons, delete_from = Deleting}) ->
QMons1 = pmon:erase(DownPid, QMons),
case Deleting =/= undefined andalso pmon:is_empty(QMons1) of
true -> gen_server:reply(Deleting, ok);
false -> ok
end,
{noreply, State#state{monitors = QMons1}}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -0,0 +1,24 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_queue_collector_common).
-export([delete_all/1]).
-spec delete_all(pid()) -> 'ok'.
delete_all(CollectorPid) ->
gen_server:call(CollectorPid, delete_all, infinity).

View File

@ -1,73 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_queue_decorator).
-include("rabbit.hrl").
-export([select/1, set/1, register/2, unregister/1]).
-behaviour(rabbit_registry_class).
-export([added_to_rabbit_registry/2, removed_from_rabbit_registry/1]).
%%----------------------------------------------------------------------------
-callback startup(rabbit_types:amqqueue()) -> 'ok'.
-callback shutdown(rabbit_types:amqqueue()) -> 'ok'.
-callback policy_changed(rabbit_types:amqqueue(), rabbit_types:amqqueue()) ->
'ok'.
-callback active_for(rabbit_types:amqqueue()) -> boolean().
%% called with Queue, MaxActivePriority, IsEmpty
-callback consumer_state_changed(
rabbit_types:amqqueue(), integer(), boolean()) -> 'ok'.
%%----------------------------------------------------------------------------
added_to_rabbit_registry(_Type, _ModuleName) -> ok.
removed_from_rabbit_registry(_Type) -> ok.
select(Modules) ->
[M || M <- Modules, code:which(M) =/= non_existing].
set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}.
list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)].
register(TypeName, ModuleName) ->
rabbit_registry:register(queue_decorator, TypeName, ModuleName),
[maybe_recover(Q) || Q <- rabbit_amqqueue:list()],
ok.
unregister(TypeName) ->
rabbit_registry:unregister(queue_decorator, TypeName),
[maybe_recover(Q) || Q <- rabbit_amqqueue:list()],
ok.
maybe_recover(Q = #amqqueue{name = Name,
decorators = Decs}) ->
#amqqueue{decorators = Decs1} = set(Q),
Old = lists:sort(select(Decs)),
New = lists:sort(select(Decs1)),
case New of
Old -> ok;
_ -> [M:startup(Q) || M <- New -- Old],
rabbit_amqqueue:update_decorators(Name)
end.

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,47 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_resource_monitor_misc).
-export([parse_information_unit/1]).
-spec parse_information_unit(integer() | string()) ->
{ok, integer()} | {error, parse_error}.
parse_information_unit(Value) when is_integer(Value) -> {ok, Value};
parse_information_unit(Value) when is_list(Value) ->
case re:run(Value,
"^(?<VAL>[0-9]+)(?<UNIT>kB|KB|MB|GB|kb|mb|gb|Kb|Mb|Gb|kiB|KiB|MiB|GiB|kib|mib|gib|KIB|MIB|GIB|k|K|m|M|g|G)?$",
[{capture, all_but_first, list}]) of
{match, [[], _]} ->
{ok, list_to_integer(Value)};
{match, [Num]} ->
{ok, list_to_integer(Num)};
{match, [Num, Unit]} ->
Multiplier = case Unit of
KiB when KiB =:= "k"; KiB =:= "kiB"; KiB =:= "K"; KiB =:= "KIB"; KiB =:= "kib" -> 1024;
MiB when MiB =:= "m"; MiB =:= "MiB"; MiB =:= "M"; MiB =:= "MIB"; MiB =:= "mib" -> 1024*1024;
GiB when GiB =:= "g"; GiB =:= "GiB"; GiB =:= "G"; GiB =:= "GIB"; GiB =:= "gib" -> 1024*1024*1024;
KB when KB =:= "KB"; KB =:= "kB"; KB =:= "kb"; KB =:= "Kb" -> 1000;
MB when MB =:= "MB"; MB =:= "mB"; MB =:= "mb"; MB =:= "Mb" -> 1000000;
GB when GB =:= "GB"; GB =:= "gB"; GB =:= "gb"; GB =:= "Gb" -> 1000000000
end,
{ok, list_to_integer(Num) * Multiplier};
nomatch ->
% log error
{error, parse_error}
end.

View File

@ -99,8 +99,8 @@
-type(listener() ::
#listener{node :: node(),
protocol :: atom(),
host :: rabbit_networking:hostname(),
port :: rabbit_networking:ip_port()}).
host :: rabbit_net:hostname(),
port :: rabbit_net:ip_port()}).
-type(binding_source() :: rabbit_exchange:name()).
-type(binding_destination() :: rabbit_amqqueue:name() | rabbit_exchange:name()).
@ -186,4 +186,6 @@
-type(proc_name() :: term()).
-type(proc_type_and_name() :: {atom(), proc_name()}).
-type(topic_access_context() :: #{routing_key => rabbit_router:routing_key(), _ => _}).
-type(topic_access_context() :: #{routing_key => rabbit_router:routing_key(),
variable_map => map(),
_ => _}).

View File

@ -46,6 +46,7 @@
send_command_flow/2, send_command_flow/3,
flush/1]).
-export([internal_send_command/4, internal_send_command/6]).
-export([msg_size/1, maybe_gc_large_msg/1]).
%% internal
-export([enter_mainloop/2, mainloop/2, mainloop1/2]).
@ -126,6 +127,12 @@
non_neg_integer(), rabbit_types:protocol()) ->
'ok'.
-spec msg_size
(rabbit_types:content() | rabbit_types:message()) -> non_neg_integer().
-spec maybe_gc_large_msg
(rabbit_types:content() | rabbit_types:message()) -> non_neg_integer().
%%---------------------------------------------------------------------------
start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity) ->
@ -228,15 +235,15 @@ handle_message({'$gen_call', From, flush}, State) ->
State1;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) ->
State1 = internal_send_command_async(MethodRecord, State),
rabbit_amqqueue:notify_sent(QPid, ChPid),
rabbit_amqqueue_common:notify_sent(QPid, ChPid),
State1;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
State) ->
State1 = internal_send_command_async(MethodRecord, Content, State),
rabbit_amqqueue:notify_sent(QPid, ChPid),
rabbit_amqqueue_common:notify_sent(QPid, ChPid),
State1;
handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
rabbit_amqqueue:notify_sent_queue_down(QPid),
rabbit_amqqueue_common:notify_sent_queue_down(QPid),
State;
handle_message({inet_reply, _, ok}, State) ->
rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats);
@ -333,7 +340,7 @@ internal_send_command_async(MethodRecord, Content,
pending = Pending}) ->
Frames = assemble_frames(Channel, MethodRecord, Content, FrameMax,
Protocol),
rabbit_basic:maybe_gc_large_msg(Content),
maybe_gc_large_msg(Content),
maybe_flush(State#wstate{pending = [Frames | Pending]}).
%% When the amount of protocol method data buffered exceeds
@ -380,3 +387,25 @@ port_cmd(Sock, Data) ->
catch error:Error -> exit({writer, send_failed, Error})
end,
ok.
%% Some processes (channel, writer) can get huge amounts of binary
%% garbage when processing huge messages at high speed (since we only
%% do enough reductions to GC every few hundred messages, and if each
%% message is 1MB then that's ugly). So count how many bytes of
%% message we have processed, and force a GC every so often.
maybe_gc_large_msg(Content) ->
Size = msg_size(Content),
Current = case get(msg_size_for_gc) of
undefined -> 0;
C -> C
end,
New = Current + Size,
put(msg_size_for_gc, case New > 1000000 of
true -> erlang:garbage_collect(),
0;
false -> New
end),
Size.
msg_size(#content{payload_fragments_rev = PFR}) -> iolist_size(PFR);
msg_size(#basic_message{content = Content}) -> msg_size(Content).

View File

@ -878,7 +878,8 @@ do_restart_delay({RestartType, Delay}, Reason, Child, State) ->
_TRef = erlang:send_after(trunc(Delay*1000), self(),
{delayed_restart,
{{RestartType, Delay}, Reason, Child}}),
{ok, state_del_child(Child, State)}
OldPid = Child#child.pid,
{ok, replace_child(Child#child{pid=restarting(OldPid)}, State)}
end.
restart(Child, State) ->

View File

@ -0,0 +1,540 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
%% In practice Erlang shouldn't be allowed to grow to more than a half
%% of available memory. The pessimistic scenario is when the Erlang VM
%% has a single process that's consuming all memory. In such a case,
%% during garbage collection, Erlang tries to allocate a huge chunk of
%% continuous memory, which can result in a crash or heavy swapping.
%%
%% This module tries to warn Rabbit before such situations occur, so
%% that it has a higher chance to avoid running out of memory.
-module(vm_memory_monitor).
-behaviour(gen_server).
-export([start_link/1, start_link/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([get_total_memory/0, get_vm_limit/0,
get_check_interval/0, set_check_interval/1,
get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1,
get_memory_limit/0, get_memory_use/1,
get_process_memory/0]).
%% for tests
-export([parse_line_linux/1]).
-define(SERVER, ?MODULE).
-define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000).
-define(ONE_MiB, 1048576).
%% For an unknown OS, we assume that we have 1GB of memory. It'll be
%% wrong. Scale by vm_memory_high_watermark in configuration to get a
%% sensible value.
-define(MEMORY_SIZE_FOR_UNKNOWN_OS, 1073741824).
-define(DEFAULT_VM_MEMORY_HIGH_WATERMARK, 0.4).
-record(state, {total_memory,
memory_limit,
memory_config_limit,
timeout,
timer,
alarmed,
alarm_funs
}).
%%----------------------------------------------------------------------------
-type vm_memory_high_watermark() :: (float() | {'absolute', integer() | string()}).
-spec start_link(float()) -> rabbit_types:ok_pid_or_error().
-spec start_link(float(), fun ((any()) -> 'ok'),
fun ((any()) -> 'ok')) -> rabbit_types:ok_pid_or_error().
-spec get_total_memory() -> (non_neg_integer() | 'unknown').
-spec get_vm_limit() -> non_neg_integer().
-spec get_check_interval() -> non_neg_integer().
-spec set_check_interval(non_neg_integer()) -> 'ok'.
-spec get_vm_memory_high_watermark() -> vm_memory_high_watermark().
-spec set_vm_memory_high_watermark(vm_memory_high_watermark()) -> 'ok'.
-spec get_memory_limit() -> non_neg_integer().
-spec get_memory_use(bytes) -> {non_neg_integer(), float() | infinity};
(ratio) -> float() | infinity.
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
get_total_memory() ->
case application:get_env(rabbit, total_memory_available_override_value) of
{ok, Value} ->
case rabbit_resource_monitor_misc:parse_information_unit(Value) of
{ok, ParsedTotal} ->
ParsedTotal;
{error, parse_error} ->
rabbit_log:warning(
"The override value for the total memmory available is "
"not a valid value: ~p, getting total from the system.~n",
[Value]),
get_total_memory_from_os()
end;
undefined ->
get_total_memory_from_os()
end.
get_vm_limit() -> get_vm_limit(os:type()).
get_check_interval() ->
gen_server:call(?MODULE, get_check_interval, infinity).
set_check_interval(Fraction) ->
gen_server:call(?MODULE, {set_check_interval, Fraction}, infinity).
get_vm_memory_high_watermark() ->
gen_server:call(?MODULE, get_vm_memory_high_watermark, infinity).
set_vm_memory_high_watermark(Fraction) ->
gen_server:call(?MODULE, {set_vm_memory_high_watermark, Fraction},
infinity).
get_memory_limit() ->
gen_server:call(?MODULE, get_memory_limit, infinity).
get_memory_use(bytes) ->
MemoryLimit = get_memory_limit(),
{get_process_memory(), case MemoryLimit > 0.0 of
true -> MemoryLimit;
false -> infinity
end};
get_memory_use(ratio) ->
MemoryLimit = get_memory_limit(),
case MemoryLimit > 0.0 of
true -> get_process_memory() / MemoryLimit;
false -> infinity
end.
%% Memory reported by erlang:memory(total) is not supposed to
%% be equal to the total size of all pages mapped to the emulator,
%% according to http://erlang.org/doc/man/erlang.html#memory-0
%% erlang:memory(total) under-reports memory usage by around 20%
-spec get_process_memory() -> Bytes :: integer().
get_process_memory() ->
case get_memory_calculation_strategy() of
rss ->
case get_system_process_resident_memory() of
{ok, MemInBytes} ->
MemInBytes;
{error, Reason} ->
rabbit_log:debug("Unable to get system memory used. Reason: ~p."
" Falling back to erlang memory reporting",
[Reason]),
erlang:memory(total)
end;
erlang ->
erlang:memory(total)
end.
-spec get_memory_calculation_strategy() -> rss | erlang.
get_memory_calculation_strategy() ->
case application:get_env(rabbit, vm_memory_calculation_strategy, rss) of
erlang ->
erlang;
rss ->
rss;
UnsupportedValue ->
rabbit_log:warning(
"Unsupported value '~p' for vm_memory_calculation_strategy. "
"Supported values: (rss|erlang). "
"Defaulting to 'rss'",
[UnsupportedValue]
),
rss
end.
-spec get_system_process_resident_memory() -> {ok, Bytes :: integer()} | {error, term()}.
get_system_process_resident_memory() ->
try
get_system_process_resident_memory(os:type())
catch _:Error ->
{error, {"Failed to get process resident memory", Error}}
end.
get_system_process_resident_memory({unix,darwin}) ->
get_ps_memory();
get_system_process_resident_memory({unix, linux}) ->
get_ps_memory();
get_system_process_resident_memory({unix,freebsd}) ->
get_ps_memory();
get_system_process_resident_memory({unix,openbsd}) ->
get_ps_memory();
get_system_process_resident_memory({win32,_OSname}) ->
OsPid = os:getpid(),
Cmd = "wmic process where processid=" ++ OsPid ++ " get WorkingSetSize /value 2>&1",
CmdOutput = os:cmd(Cmd),
%% Memory usage is displayed in bytes
case re:run(CmdOutput, "WorkingSetSize=([0-9]+)", [{capture, all_but_first, binary}]) of
{match, [Match]} ->
{ok, binary_to_integer(Match)};
_ ->
{error, {unexpected_output_from_command, Cmd, CmdOutput}}
end;
get_system_process_resident_memory({unix, sunos}) ->
get_ps_memory();
get_system_process_resident_memory({unix, aix}) ->
get_ps_memory();
get_system_process_resident_memory(_OsType) ->
{error, not_implemented_for_os}.
get_ps_memory() ->
OsPid = os:getpid(),
Cmd = "ps -p " ++ OsPid ++ " -o rss=",
CmdOutput = os:cmd(Cmd),
case re:run(CmdOutput, "[0-9]+", [{capture, first, list}]) of
{match, [Match]} ->
{ok, list_to_integer(Match) * 1024};
_ ->
{error, {unexpected_output_from_command, Cmd, CmdOutput}}
end.
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
start_link(MemFraction) ->
start_link(MemFraction,
fun alarm_handler:set_alarm/1, fun alarm_handler:clear_alarm/1).
start_link(MemFraction, AlarmSet, AlarmClear) ->
gen_server:start_link({local, ?SERVER}, ?MODULE,
[MemFraction, {AlarmSet, AlarmClear}], []).
init([MemFraction, AlarmFuns]) ->
TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
State = #state { timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
timer = TRef,
alarmed = false,
alarm_funs = AlarmFuns },
{ok, set_mem_limits(State, MemFraction)}.
handle_call(get_vm_memory_high_watermark, _From,
#state{memory_config_limit = MemLimit} = State) ->
{reply, MemLimit, State};
handle_call({set_vm_memory_high_watermark, MemLimit}, _From, State) ->
{reply, ok, set_mem_limits(State, MemLimit)};
handle_call(get_check_interval, _From, State) ->
{reply, State#state.timeout, State};
handle_call({set_check_interval, Timeout}, _From, State) ->
{ok, cancel} = timer:cancel(State#state.timer),
{reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
handle_call(get_memory_limit, _From, State) ->
{reply, State#state.memory_limit, State};
handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast(_Request, State) ->
{noreply, State}.
handle_info(update, State) ->
{noreply, internal_update(State)};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------------------------------------
%% Server Internals
%%----------------------------------------------------------------------------
get_total_memory_from_os() ->
try
get_total_memory(os:type())
catch _:Error ->
rabbit_log:warning(
"Failed to get total system memory: ~n~p~n~p~n",
[Error, erlang:get_stacktrace()]),
unknown
end.
set_mem_limits(State, MemLimit) ->
case erlang:system_info(wordsize) of
4 ->
error_logger:warning_msg(
"You are using a 32-bit version of Erlang: you may run into "
"memory address~n"
"space exhaustion or statistic counters overflow.~n");
_ ->
ok
end,
TotalMemory =
case get_total_memory() of
unknown ->
case State of
#state { total_memory = undefined,
memory_limit = undefined } ->
error_logger:warning_msg(
"Unknown total memory size for your OS ~p. "
"Assuming memory size is ~p MiB (~p bytes).~n",
[os:type(),
trunc(?MEMORY_SIZE_FOR_UNKNOWN_OS/?ONE_MiB),
?MEMORY_SIZE_FOR_UNKNOWN_OS]);
_ ->
ok
end,
?MEMORY_SIZE_FOR_UNKNOWN_OS;
M -> M
end,
UsableMemory =
case get_vm_limit() of
Limit when Limit < TotalMemory ->
error_logger:warning_msg(
"Only ~p MiB (~p bytes) of ~p MiB (~p bytes) memory usable due to "
"limited address space.~n"
"Crashes due to memory exhaustion are possible - see~n"
"http://www.rabbitmq.com/memory.html#address-space~n",
[trunc(Limit/?ONE_MiB), Limit, trunc(TotalMemory/?ONE_MiB),
TotalMemory]),
Limit;
_ ->
TotalMemory
end,
MemLim = interpret_limit(parse_mem_limit(MemLimit), UsableMemory),
error_logger:info_msg("Memory limit set to ~p MiB (~p bytes) of ~p MiB (~p bytes) total.~n",
[trunc(MemLim/?ONE_MiB), MemLim, trunc(TotalMemory/?ONE_MiB),
TotalMemory]),
internal_update(State #state { total_memory = TotalMemory,
memory_limit = MemLim,
memory_config_limit = MemLimit}).
interpret_limit({'absolute', MemLim}, UsableMemory) ->
erlang:min(MemLim, UsableMemory);
interpret_limit(MemFraction, UsableMemory) ->
trunc(MemFraction * UsableMemory).
parse_mem_limit({absolute, Limit}) ->
case rabbit_resource_monitor_misc:parse_information_unit(Limit) of
{ok, ParsedLimit} -> {absolute, ParsedLimit};
{error, parse_error} ->
rabbit_log:error("Unable to parse vm_memory_high_watermark value ~p", [Limit]),
?DEFAULT_VM_MEMORY_HIGH_WATERMARK
end;
parse_mem_limit(Relative) when is_float(Relative), Relative < 1 ->
Relative;
parse_mem_limit(_) ->
?DEFAULT_VM_MEMORY_HIGH_WATERMARK.
internal_update(State = #state { memory_limit = MemLimit,
alarmed = Alarmed,
alarm_funs = {AlarmSet, AlarmClear} }) ->
MemUsed = get_process_memory(),
NewAlarmed = MemUsed > MemLimit,
case {Alarmed, NewAlarmed} of
{false, true} -> emit_update_info(set, MemUsed, MemLimit),
AlarmSet({{resource_limit, memory, node()}, []});
{true, false} -> emit_update_info(clear, MemUsed, MemLimit),
AlarmClear({resource_limit, memory, node()});
_ -> ok
end,
State #state {alarmed = NewAlarmed}.
emit_update_info(AlarmState, MemUsed, MemLimit) ->
error_logger:info_msg(
"vm_memory_high_watermark ~p. Memory used:~p allowed:~p~n",
[AlarmState, MemUsed, MemLimit]).
start_timer(Timeout) ->
{ok, TRef} = timer:send_interval(Timeout, update),
TRef.
%% According to http://msdn.microsoft.com/en-us/library/aa366778(VS.85).aspx
%% Windows has 2GB and 8TB of address space for 32 and 64 bit accordingly.
get_vm_limit({win32,_OSname}) ->
case erlang:system_info(wordsize) of
4 -> 2*1024*1024*1024; %% 2 GB for 32 bits 2^31
8 -> 8*1024*1024*1024*1024 %% 8 TB for 64 bits 2^42
end;
%% On a 32-bit machine, if you're using more than 2 gigs of RAM you're
%% in big trouble anyway.
get_vm_limit(_OsType) ->
case erlang:system_info(wordsize) of
4 -> 2*1024*1024*1024; %% 2 GB for 32 bits 2^31
8 -> 256*1024*1024*1024*1024 %% 256 TB for 64 bits 2^48
%%http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details
end.
%%----------------------------------------------------------------------------
%% Internal Helpers
%%----------------------------------------------------------------------------
cmd(Command) ->
Exec = hd(string:tokens(Command, " ")),
case os:find_executable(Exec) of
false -> throw({command_not_found, Exec});
_ -> os:cmd(Command)
end.
%% get_total_memory(OS) -> Total
%% Windows and Freebsd code based on: memsup:get_memory_usage/1
%% Original code was part of OTP and released under "Erlang Public License".
get_total_memory({unix,darwin}) ->
File = cmd("/usr/bin/vm_stat"),
Lines = string:tokens(File, "\n"),
Dict = dict:from_list(lists:map(fun parse_line_mach/1, Lines)),
[PageSize, Inactive, Active, Free, Wired] =
[dict:fetch(Key, Dict) ||
Key <- [page_size, 'Pages inactive', 'Pages active', 'Pages free',
'Pages wired down']],
PageSize * (Inactive + Active + Free + Wired);
get_total_memory({unix,freebsd}) ->
PageSize = sysctl("vm.stats.vm.v_page_size"),
PageCount = sysctl("vm.stats.vm.v_page_count"),
PageCount * PageSize;
get_total_memory({unix,openbsd}) ->
sysctl("hw.usermem");
get_total_memory({win32,_OSname}) ->
[Result|_] = os_mon_sysinfo:get_mem_info(),
{ok, [_MemLoad, TotPhys, _AvailPhys, _TotPage, _AvailPage, _TotV, _AvailV],
_RestStr} =
io_lib:fread("~d~d~d~d~d~d~d", Result),
TotPhys;
get_total_memory({unix, linux}) ->
File = read_proc_file("/proc/meminfo"),
Lines = string:tokens(File, "\n"),
Dict = dict:from_list(lists:map(fun parse_line_linux/1, Lines)),
dict:fetch('MemTotal', Dict);
get_total_memory({unix, sunos}) ->
File = cmd("/usr/sbin/prtconf"),
Lines = string:tokens(File, "\n"),
Dict = dict:from_list(lists:map(fun parse_line_sunos/1, Lines)),
dict:fetch('Memory size', Dict);
get_total_memory({unix, aix}) ->
File = cmd("/usr/bin/vmstat -v"),
Lines = string:tokens(File, "\n"),
Dict = dict:from_list(lists:map(fun parse_line_aix/1, Lines)),
dict:fetch('memory pages', Dict) * 4096;
get_total_memory(_OsType) ->
unknown.
%% A line looks like "Foo bar: 123456."
parse_line_mach(Line) ->
[Name, RHS | _Rest] = string:tokens(Line, ":"),
case Name of
"Mach Virtual Memory Statistics" ->
["(page", "size", "of", PageSize, "bytes)"] =
string:tokens(RHS, " "),
{page_size, list_to_integer(PageSize)};
_ ->
[Value | _Rest1] = string:tokens(RHS, " ."),
{list_to_atom(Name), list_to_integer(Value)}
end.
%% A line looks like "MemTotal: 502968 kB"
%% or (with broken OS/modules) "Readahead 123456 kB"
parse_line_linux(Line) ->
{Name, Value, UnitRest} =
case string:tokens(Line, ":") of
%% no colon in the line
[S] ->
[K, RHS] = re:split(S, "\s", [{parts, 2}, {return, list}]),
[V | Unit] = string:tokens(RHS, " "),
{K, V, Unit};
[K, RHS | _Rest] ->
[V | Unit] = string:tokens(RHS, " "),
{K, V, Unit}
end,
Value1 = case UnitRest of
[] -> list_to_integer(Value); %% no units
["kB"] -> list_to_integer(Value) * 1024
end,
{list_to_atom(Name), Value1}.
%% A line looks like "Memory size: 1024 Megabytes"
parse_line_sunos(Line) ->
case string:tokens(Line, ":") of
[Name, RHS | _Rest] ->
[Value1 | UnitsRest] = string:tokens(RHS, " "),
Value2 = case UnitsRest of
["Gigabytes"] ->
list_to_integer(Value1) * ?ONE_MiB * 1024;
["Megabytes"] ->
list_to_integer(Value1) * ?ONE_MiB;
["Kilobytes"] ->
list_to_integer(Value1) * 1024;
_ ->
Value1 ++ UnitsRest %% no known units
end,
{list_to_atom(Name), Value2};
[Name] -> {list_to_atom(Name), none}
end.
%% Lines look like " 12345 memory pages"
%% or " 80.1 maxpin percentage"
parse_line_aix(Line) ->
[Value | NameWords] = string:tokens(Line, " "),
Name = string:join(NameWords, " "),
{list_to_atom(Name),
case lists:member($., Value) of
true -> trunc(list_to_float(Value));
false -> list_to_integer(Value)
end}.
sysctl(Def) ->
list_to_integer(cmd("/sbin/sysctl -n " ++ Def) -- "\n").
%% file:read_file does not work on files in /proc as it seems to get
%% the size of the file first and then read that many bytes. But files
%% in /proc always have length 0, we just have to read until we get
%% eof.
read_proc_file(File) ->
{ok, IoDevice} = file:open(File, [read, raw]),
Res = read_proc_file(IoDevice, []),
_ = file:close(IoDevice),
lists:flatten(lists:reverse(Res)).
-define(BUFFER_SIZE, 1024).
read_proc_file(IoDevice, Acc) ->
case file:read(IoDevice, ?BUFFER_SIZE) of
{ok, Res} -> read_proc_file(IoDevice, [Res | Acc]);
eof -> Acc
end.

172
deps/rabbit_common/src/worker_pool.erl vendored Normal file
View File

@ -0,0 +1,172 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(worker_pool).
%% Generic worker pool manager.
%%
%% Submitted jobs are functions. They can be executed synchronously
%% (using worker_pool:submit/1, worker_pool:submit/2) or asynchronously
%% (using worker_pool:submit_async/1).
%%
%% We typically use the worker pool if we want to limit the maximum
%% parallelism of some job. We are not trying to dodge the cost of
%% creating Erlang processes.
%%
%% Supports nested submission of jobs and two execution modes:
%% 'single' and 'reuse'. Jobs executed in 'single' mode are invoked in
%% a one-off process. Those executed in 'reuse' mode are invoked in a
%% worker process out of the pool. Nested jobs are always executed
%% immediately in current worker process.
%%
%% 'single' mode is offered to work around a bug in Mnesia: after
%% network partitions reply messages for prior failed requests can be
%% sent to Mnesia clients - a reused worker pool process can crash on
%% receiving one.
%%
%% Caller submissions are enqueued internally. When the next worker
%% process is available, it communicates it to the pool and is
%% assigned a job to execute. If job execution fails with an error, no
%% response is returned to the caller.
%%
%% Worker processes prioritise certain command-and-control messages
%% from the pool.
%%
%% Future improvement points: job prioritisation.
-behaviour(gen_server2).
-export([start_link/1,
submit/1, submit/2, submit/3,
submit_async/1, submit_async/2,
ready/2,
idle/2,
default_pool/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%%----------------------------------------------------------------------------
-type mfargs() :: {atom(), atom(), [any()]}.
-spec start_link(atom()) -> {'ok', pid()} | {'error', any()}.
-spec submit(fun (() -> A) | mfargs()) -> A.
-spec submit(fun (() -> A) | mfargs(), 'reuse' | 'single') -> A.
-spec submit(atom(), fun (() -> A) | mfargs(), 'reuse' | 'single') -> A.
-spec submit_async(fun (() -> any()) | mfargs()) -> 'ok'.
-spec ready(atom(), pid()) -> 'ok'.
-spec idle(atom(), pid()) -> 'ok'.
-spec default_pool() -> atom().
%%----------------------------------------------------------------------------
-define(DEFAULT_POOL, ?MODULE).
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
-record(state, { available, pending }).
%%----------------------------------------------------------------------------
start_link(Name) -> gen_server2:start_link({local, Name}, ?MODULE, [],
[{timeout, infinity}]).
submit(Fun) ->
submit(?DEFAULT_POOL, Fun, reuse).
%% ProcessModel =:= single is for working around the mnesia_locker bug.
submit(Fun, ProcessModel) ->
submit(?DEFAULT_POOL, Fun, ProcessModel).
submit(Server, Fun, ProcessModel) ->
case get(worker_pool_worker) of
true -> worker_pool_worker:run(Fun);
_ -> Pid = gen_server2:call(Server, {next_free, self()}, infinity),
worker_pool_worker:submit(Pid, Fun, ProcessModel)
end.
submit_async(Fun) -> submit_async(?DEFAULT_POOL, Fun).
submit_async(Server, Fun) -> gen_server2:cast(Server, {run_async, Fun}).
ready(Server, WPid) -> gen_server2:cast(Server, {ready, WPid}).
idle(Server, WPid) -> gen_server2:cast(Server, {idle, WPid}).
default_pool() -> ?DEFAULT_POOL.
%%----------------------------------------------------------------------------
init([]) ->
{ok, #state { pending = queue:new(), available = ordsets:new() }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call({next_free, CPid}, From, State = #state { available = [],
pending = Pending }) ->
{noreply, State#state{pending = queue:in({next_free, From, CPid}, Pending)},
hibernate};
handle_call({next_free, CPid}, _From, State = #state { available =
[WPid | Avail1] }) ->
worker_pool_worker:next_job_from(WPid, CPid),
{reply, WPid, State #state { available = Avail1 }, hibernate};
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
handle_cast({ready, WPid}, State) ->
erlang:monitor(process, WPid),
handle_cast({idle, WPid}, State);
handle_cast({idle, WPid}, State = #state { available = Avail,
pending = Pending }) ->
{noreply,
case queue:out(Pending) of
{empty, _Pending} ->
State #state { available = ordsets:add_element(WPid, Avail) };
{{value, {next_free, From, CPid}}, Pending1} ->
worker_pool_worker:next_job_from(WPid, CPid),
gen_server2:reply(From, WPid),
State #state { pending = Pending1 };
{{value, {run_async, Fun}}, Pending1} ->
worker_pool_worker:submit_async(WPid, Fun),
State #state { pending = Pending1 }
end, hibernate};
handle_cast({run_async, Fun}, State = #state { available = [],
pending = Pending }) ->
{noreply, State #state { pending = queue:in({run_async, Fun}, Pending)},
hibernate};
handle_cast({run_async, Fun}, State = #state { available = [WPid | Avail1] }) ->
worker_pool_worker:submit_async(WPid, Fun),
{noreply, State #state { available = Avail1 }, hibernate};
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
handle_info({'DOWN', _MRef, process, WPid, _Reason},
State = #state { available = Avail }) ->
{noreply, State #state { available = ordsets:del_element(WPid, Avail) },
hibernate};
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(_Reason, State) ->
State.

View File

@ -0,0 +1,56 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(worker_pool_sup).
-behaviour(supervisor).
-export([start_link/0, start_link/1, start_link/2]).
-export([init/1]).
%%----------------------------------------------------------------------------
-spec start_link() -> rabbit_types:ok_pid_or_error().
-spec start_link(non_neg_integer()) -> rabbit_types:ok_pid_or_error().
-spec start_link(non_neg_integer(), atom())
-> rabbit_types:ok_pid_or_error().
%%----------------------------------------------------------------------------
start_link() ->
start_link(erlang:system_info(schedulers)).
start_link(WCount) ->
start_link(WCount, worker_pool:default_pool()).
start_link(WCount, PoolName) ->
SupName = list_to_atom(atom_to_list(PoolName) ++ "_sup"),
supervisor:start_link({local, SupName}, ?MODULE, [WCount, PoolName]).
%%----------------------------------------------------------------------------
init([WCount, PoolName]) ->
%% we want to survive up to 1K of worker restarts per second,
%% e.g. when a large worker pool used for network connections
%% encounters a network failure. This is the case in the LDAP authentication
%% backend plugin.
{ok, {{one_for_one, 1000, 1},
[{worker_pool, {worker_pool, start_link, [PoolName]}, transient,
16#ffffffff, worker, [worker_pool]} |
[{N, {worker_pool_worker, start_link, [PoolName]}, transient,
16#ffffffff, worker, [worker_pool_worker]}
|| N <- lists:seq(1, WCount)]]}}.

View File

@ -0,0 +1,193 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(worker_pool_worker).
%% Executes jobs (functions) submitted to a worker pool with worker_pool:submit/1,
%% worker_pool:submit/2 or worker_pool:submit_async/1.
%%
%% See worker_pool for an overview.
-behaviour(gen_server2).
-export([start_link/1, next_job_from/2, submit/3, submit_async/2,
run/1]).
-export([set_maximum_since_use/2]).
-export([set_timeout/2, set_timeout/3, clear_timeout/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_cast/3]).
%%----------------------------------------------------------------------------
-type mfargs() :: {atom(), atom(), [any()]}.
-spec start_link(atom) -> {'ok', pid()} | {'error', any()}.
-spec next_job_from(pid(), pid()) -> 'ok'.
-spec submit(pid(), fun (() -> A) | mfargs(), 'reuse' | 'single') -> A.
-spec submit_async(pid(), fun (() -> any()) | mfargs()) -> 'ok'.
-spec run(fun (() -> A)) -> A; (mfargs()) -> any().
-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'.
%%----------------------------------------------------------------------------
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
%%----------------------------------------------------------------------------
start_link(PoolName) ->
gen_server2:start_link(?MODULE, [PoolName], [{timeout, infinity}]).
next_job_from(Pid, CPid) ->
gen_server2:cast(Pid, {next_job_from, CPid}).
submit(Pid, Fun, ProcessModel) ->
gen_server2:call(Pid, {submit, Fun, self(), ProcessModel}, infinity).
submit_async(Pid, Fun) ->
gen_server2:cast(Pid, {submit_async, Fun}).
set_maximum_since_use(Pid, Age) ->
gen_server2:cast(Pid, {set_maximum_since_use, Age}).
run({M, F, A}) -> apply(M, F, A);
run(Fun) -> Fun().
run(Fun, reuse) ->
run(Fun);
run(Fun, single) ->
Self = self(),
Ref = make_ref(),
spawn_link(fun () ->
put(worker_pool_worker, true),
Self ! {Ref, run(Fun)},
unlink(Self)
end),
receive
{Ref, Res} -> Res
end.
%%----------------------------------------------------------------------------
init([PoolName]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
ok = worker_pool:ready(PoolName, self()),
put(worker_pool_worker, true),
put(worker_pool_name, PoolName),
{ok, undefined, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7;
prioritise_cast(_Msg, _Len, _State) -> 0.
handle_call({submit, Fun, CPid, ProcessModel}, From, undefined) ->
{noreply, {job, CPid, From, Fun, ProcessModel}, hibernate};
handle_call({submit, Fun, CPid, ProcessModel}, From, {from, CPid, MRef}) ->
erlang:demonitor(MRef),
gen_server2:reply(From, run(Fun, ProcessModel)),
ok = worker_pool:idle(get(worker_pool_name), self()),
{noreply, undefined, hibernate};
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
handle_cast({next_job_from, CPid}, undefined) ->
MRef = erlang:monitor(process, CPid),
{noreply, {from, CPid, MRef}, hibernate};
handle_cast({next_job_from, CPid}, {job, CPid, From, Fun, ProcessModel}) ->
gen_server2:reply(From, run(Fun, ProcessModel)),
ok = worker_pool:idle(get(worker_pool_name), self()),
{noreply, undefined, hibernate};
handle_cast({submit_async, Fun}, undefined) ->
run(Fun),
ok = worker_pool:idle(get(worker_pool_name), self()),
{noreply, undefined, hibernate};
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
{noreply, State, hibernate};
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) ->
ok = worker_pool:idle(get(worker_pool_name), self()),
{noreply, undefined, hibernate};
handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) ->
{noreply, State, hibernate};
handle_info({timeout, Key, Fun}, State) ->
clear_timeout(Key),
Fun(),
{noreply, State, hibernate};
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(_Reason, State) ->
State.
-spec set_timeout(integer(), fun(() -> any())) -> reference().
set_timeout(Time, Fun) ->
Key = make_ref(),
set_timeout(Key, Time, Fun).
-spec set_timeout(Key, integer(), fun(() -> any())) -> Key when Key :: any().
set_timeout(Key, Time, Fun) ->
Timeouts = get_timeouts(),
set_timeout(Key, Time, Fun, Timeouts).
-spec clear_timeout(any()) -> ok.
clear_timeout(Key) ->
NewTimeouts = cancel_timeout(Key, get_timeouts()),
put(timeouts, NewTimeouts),
ok.
get_timeouts() ->
case get(timeouts) of
undefined -> dict:new();
Dict -> Dict
end.
set_timeout(Key, Time, Fun, Timeouts) ->
cancel_timeout(Key, Timeouts),
{ok, TRef} = timer:send_after(Time, {timeout, Key, Fun}),
NewTimeouts = dict:store(Key, TRef, Timeouts),
put(timeouts, NewTimeouts),
{ok, Key}.
cancel_timeout(Key, Timeouts) ->
case dict:find(Key, Timeouts) of
{ok, TRef} ->
timer:cancel(TRef),
receive {timeout, Key, _} -> ok
after 0 -> ok
end,
dict:erase(Key, Timeouts);
error ->
Timeouts
end.

View File

@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2016 Pivotal Software, Inc. All rights reserved.
%% Copyright (c) 2016-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(unit_SUITE).
@ -34,13 +34,24 @@ groups() ->
version_equivalence,
version_minor_equivalence_properties,
version_comparison,
pid_decompose_compose
pid_decompose_compose,
platform_and_version
]}
].
init_per_group(_, Config) -> Config.
end_per_group(_, Config) -> Config.
platform_and_version(_Config) ->
MajorVersion = erlang:system_info(otp_release),
Result = rabbit_misc:platform_and_version(),
RegExp = "^Erlang/OTP\s" ++ MajorVersion,
case re:run(Result, RegExp) of
nomatch -> ct:fail("~p does not match ~p", [Result, RegExp]);
{error, ErrType} -> ct:fail("~p", [ErrType]);
_ -> ok
end.
pid_decompose_compose(_Config) ->
Pid = self(),
{Node, Cre, Id, Ser} = rabbit_misc:decompose_pid(Pid),

14
deps/rabbit_common/xref.config vendored Normal file
View File

@ -0,0 +1,14 @@
% vim:ft=erlang:sw=2:et:
[
{xref, [
{checks, [undefined_function_calls,
undefined_functions,
locals_not_used
%exports_not_used,
%deprecated_function_calls,
%deprecated_functions
]}
]
}
].