Merged bug24671 into default
This commit is contained in:
commit
33a91402a7
|
|
@ -8,8 +8,6 @@
|
|||
|
||||
<xsl:output method="xml" />
|
||||
|
||||
<xsl:template match="*"/>
|
||||
|
||||
<!-- Copy every element through -->
|
||||
<xsl:template match="*">
|
||||
<xsl:element name="{name()}" namespace="http://www.w3.org/1999/xhtml">
|
||||
|
|
@ -28,36 +26,30 @@
|
|||
<head>
|
||||
<title><xsl:value-of select="document($original)/refentry/refnamediv/refname"/><xsl:if test="document($original)/refentry/refmeta/manvolnum">(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</xsl:if> manual page</title>
|
||||
</head>
|
||||
<body>
|
||||
<doc:div>
|
||||
<xsl:choose>
|
||||
<body show-in-this-page="true">
|
||||
<xsl:choose>
|
||||
<xsl:when test="document($original)/refentry/refmeta/manvolnum">
|
||||
<p>
|
||||
This is the manual page for
|
||||
<code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>.
|
||||
</p>
|
||||
<p>
|
||||
<a href="../manpages.html">See a list of all manual pages</a>.
|
||||
</p>
|
||||
<p>
|
||||
This is the manual page for
|
||||
<code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>.
|
||||
</p>
|
||||
<p>
|
||||
<a href="../manpages.html">See a list of all manual pages</a>.
|
||||
</p>
|
||||
</xsl:when>
|
||||
<xsl:otherwise>
|
||||
<p>
|
||||
This is the documentation for
|
||||
<code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>.
|
||||
</p>
|
||||
<p>
|
||||
This is the documentation for
|
||||
<code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>.
|
||||
</p>
|
||||
</xsl:otherwise>
|
||||
</xsl:choose>
|
||||
<p>
|
||||
</xsl:choose>
|
||||
<p>
|
||||
For more general documentation, please see the
|
||||
<a href="../admin-guide.html">administrator's guide</a>.
|
||||
</p>
|
||||
<a href="../admin-guide.html">administrator's guide</a>.
|
||||
</p>
|
||||
|
||||
<doc:toc class="compact">
|
||||
<doc:heading>Table of Contents</doc:heading>
|
||||
</doc:toc>
|
||||
|
||||
<xsl:apply-templates select="body/div[@class='refentry']"/>
|
||||
</doc:div>
|
||||
<xsl:apply-templates select="body/div[@class='refentry']"/>
|
||||
</body>
|
||||
</html>
|
||||
</xsl:template>
|
||||
|
|
|
|||
|
|
@ -37,10 +37,12 @@
|
|||
{auth_backends, [rabbit_auth_backend_internal]},
|
||||
{delegate_count, 16},
|
||||
{trace_vhosts, []},
|
||||
{log_levels, [{connection, info}]},
|
||||
{tcp_listen_options, [binary,
|
||||
{packet, raw},
|
||||
{reuseaddr, true},
|
||||
{backlog, 128},
|
||||
{nodelay, true},
|
||||
{linger, {true, 0}},
|
||||
{exit_on_close, false}]}
|
||||
]}]}.
|
||||
|
|
|
|||
|
|
@ -56,9 +56,11 @@
|
|||
-record(binding, {source, key, destination, args = []}).
|
||||
-record(reverse_binding, {destination, key, source, args = []}).
|
||||
|
||||
-record(topic_trie_node, {trie_node, edge_count, binding_count}).
|
||||
-record(topic_trie_edge, {trie_edge, node_id}).
|
||||
-record(topic_trie_binding, {trie_binding, value = const}).
|
||||
|
||||
-record(trie_node, {exchange_name, node_id}).
|
||||
-record(trie_edge, {exchange_name, node_id, word}).
|
||||
-record(trie_binding, {exchange_name, node_id, destination}).
|
||||
|
||||
|
|
@ -97,13 +99,3 @@
|
|||
|
||||
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
|
||||
-define(DELETED_HEADER, <<"BCC">>).
|
||||
|
||||
-ifdef(debug).
|
||||
-define(LOGDEBUG0(F), rabbit_log:debug(F)).
|
||||
-define(LOGDEBUG(F,A), rabbit_log:debug(F,A)).
|
||||
-define(LOGMESSAGE(D,C,M,Co), rabbit_log:message(D,C,M,Co)).
|
||||
-else.
|
||||
-define(LOGDEBUG0(F), ok).
|
||||
-define(LOGDEBUG(F,A), ok).
|
||||
-define(LOGMESSAGE(D,C,M,Co), ok).
|
||||
-endif.
|
||||
|
|
|
|||
|
|
@ -62,6 +62,10 @@ use_parallel_build yes
|
|||
|
||||
build.env-append HOME=${workpath}
|
||||
|
||||
build.env-append VERSION=${version}
|
||||
|
||||
destroot.env-append VERSION=${version}
|
||||
|
||||
destroot.target install_bin
|
||||
|
||||
destroot.destdir \
|
||||
|
|
|
|||
|
|
@ -144,32 +144,17 @@
|
|||
|
||||
-type child() :: pid() | 'undefined'.
|
||||
-type child_id() :: term().
|
||||
-type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | 'undefined'}.
|
||||
-type modules() :: [module()] | 'dynamic'.
|
||||
-type restart() :: 'permanent' | 'transient' | 'temporary'.
|
||||
-type shutdown() :: 'brutal_kill' | timeout().
|
||||
-type worker() :: 'worker' | 'supervisor'.
|
||||
-type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}.
|
||||
-type sup_ref() :: (Name :: atom())
|
||||
| {Name :: atom(), Node :: node()}
|
||||
| {'global', Name :: atom()}
|
||||
| pid().
|
||||
-type child_spec() :: {Id :: child_id(),
|
||||
StartFunc :: mfargs(),
|
||||
Restart :: restart(),
|
||||
Shutdown :: shutdown(),
|
||||
Type :: worker(),
|
||||
Modules :: modules()}.
|
||||
|
||||
-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term().
|
||||
-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}.
|
||||
|
||||
-type startchild_err() :: 'already_present'
|
||||
| {'already_started', Child :: child()} | term().
|
||||
-type startchild_ret() :: {'ok', Child :: child()}
|
||||
| {'ok', Child :: child(), Info :: term()}
|
||||
| {'error', startchild_err()}.
|
||||
|
||||
-type group_name() :: any().
|
||||
|
||||
-spec start_link(GroupName, Module, Args) -> startlink_ret() when
|
||||
|
|
@ -183,9 +168,9 @@
|
|||
Module :: module(),
|
||||
Args :: term().
|
||||
|
||||
-spec start_child(SupRef, ChildSpec) -> startchild_ret() when
|
||||
-spec start_child(SupRef, ChildSpec) -> supervisor:startchild_ret() when
|
||||
SupRef :: sup_ref(),
|
||||
ChildSpec :: child_spec() | (List :: [term()]).
|
||||
ChildSpec :: supervisor:child_spec() | (List :: [term()]).
|
||||
|
||||
-spec restart_child(SupRef, Id) -> Result when
|
||||
SupRef :: sup_ref(),
|
||||
|
|
@ -215,12 +200,12 @@
|
|||
Modules :: modules().
|
||||
|
||||
-spec check_childspecs(ChildSpecs) -> Result when
|
||||
ChildSpecs :: [child_spec()],
|
||||
ChildSpecs :: [supervisor:child_spec()],
|
||||
Result :: 'ok' | {'error', Error :: term()}.
|
||||
|
||||
-spec start_internal(Group, ChildSpecs) -> Result when
|
||||
Group :: group_name(),
|
||||
ChildSpecs :: [child_spec()],
|
||||
ChildSpecs :: [supervisor:child_spec()],
|
||||
Result :: startlink_ret().
|
||||
|
||||
-spec create_tables() -> Result when
|
||||
|
|
|
|||
|
|
@ -132,7 +132,7 @@
|
|||
-rabbit_boot_step({recovery,
|
||||
[{description, "exchange, queue and binding recovery"},
|
||||
{mfa, {rabbit, recover, []}},
|
||||
{requires, empty_db_check},
|
||||
{requires, core_initialized},
|
||||
{enables, routing_ready}]}).
|
||||
|
||||
-rabbit_boot_step({mirror_queue_slave_sup,
|
||||
|
|
@ -158,8 +158,9 @@
|
|||
{enables, networking}]}).
|
||||
|
||||
-rabbit_boot_step({direct_client,
|
||||
[{mfa, {rabbit_direct, boot, []}},
|
||||
{requires, log_relay}]}).
|
||||
[{description, "direct client"},
|
||||
{mfa, {rabbit_direct, boot, []}},
|
||||
{requires, log_relay}]}).
|
||||
|
||||
-rabbit_boot_step({networking,
|
||||
[{mfa, {rabbit_networking, boot, []}},
|
||||
|
|
@ -190,7 +191,7 @@
|
|||
rabbit_queue_index, gen, dict, ordsets, file_handle_cache,
|
||||
rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file,
|
||||
rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
|
||||
mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists]).
|
||||
mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow]).
|
||||
|
||||
%% HiPE compilation uses multiple cores anyway, but some bits are
|
||||
%% IO-bound so we can go faster if we parallelise a bit more. In
|
||||
|
|
@ -441,8 +442,7 @@ run_boot_step({StepName, Attributes}) ->
|
|||
[try
|
||||
apply(M,F,A)
|
||||
catch
|
||||
_:Reason -> boot_error("FAILED~nReason: ~p~nStacktrace: ~p~n",
|
||||
[Reason, erlang:get_stacktrace()])
|
||||
_:Reason -> boot_step_error(Reason, erlang:get_stacktrace())
|
||||
end || {M,F,A} <- MFAs],
|
||||
io:format("done~n"),
|
||||
ok
|
||||
|
|
@ -501,8 +501,14 @@ sort_boot_steps(UnsortedSteps) ->
|
|||
end])
|
||||
end.
|
||||
|
||||
boot_step_error(Reason, Stacktrace) ->
|
||||
boot_error("Error description:~n ~p~n~n"
|
||||
"Log files (may contain more information):~n ~s~n ~s~n~n"
|
||||
"Stack trace:~n ~p~n~n",
|
||||
[Reason, log_location(kernel), log_location(sasl), Stacktrace]).
|
||||
|
||||
boot_error(Format, Args) ->
|
||||
io:format("BOOT ERROR: " ++ Format, Args),
|
||||
io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
|
||||
error_logger:error_msg(Format, Args),
|
||||
timer:sleep(1000),
|
||||
exit({?MODULE, failure_during_boot}).
|
||||
|
|
|
|||
|
|
@ -66,7 +66,6 @@ check_user_login(Username, AuthProps) ->
|
|||
|
||||
check_vhost_access(User = #user{ username = Username,
|
||||
auth_backend = Module }, VHostPath) ->
|
||||
?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]),
|
||||
check_access(
|
||||
fun() ->
|
||||
rabbit_vhost:exists(VHostPath) andalso
|
||||
|
|
|
|||
|
|
@ -31,10 +31,9 @@
|
|||
|
||||
-ifdef(use_specs).
|
||||
|
||||
-type(mfa_tuple() :: {atom(), atom(), list()}).
|
||||
-spec(start/0 :: () -> 'ok').
|
||||
-spec(stop/0 :: () -> 'ok').
|
||||
-spec(register/2 :: (pid(), mfa_tuple()) -> boolean()).
|
||||
-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()).
|
||||
-spec(on_node_up/1 :: (node()) -> 'ok').
|
||||
-spec(on_node_down/1 :: (node()) -> 'ok').
|
||||
|
||||
|
|
|
|||
|
|
@ -115,7 +115,6 @@ info_keys() -> ?INFO_KEYS.
|
|||
%%----------------------------------------------------------------------------
|
||||
|
||||
init(Q) ->
|
||||
?LOGDEBUG("Queue starting - ~p~n", [Q]),
|
||||
process_flag(trap_exit, true),
|
||||
|
||||
State = #q{q = Q#amqqueue{pid = self()},
|
||||
|
|
@ -135,7 +134,6 @@ init(Q) ->
|
|||
|
||||
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
|
||||
RateTRef, AckTags, Deliveries, MTC) ->
|
||||
?LOGDEBUG("Queue starting - ~p~n", [Q]),
|
||||
case Owner of
|
||||
none -> ok;
|
||||
_ -> erlang:monitor(process, Owner)
|
||||
|
|
@ -1114,8 +1112,7 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
|
|||
|
||||
handle_info(maybe_expire, State) ->
|
||||
case is_unused(State) of
|
||||
true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]),
|
||||
{stop, normal, State};
|
||||
true -> {stop, normal, State};
|
||||
false -> noreply(ensure_expiry_timer(State))
|
||||
end;
|
||||
|
||||
|
|
@ -1167,7 +1164,6 @@ handle_info({bump_credit, Msg}, State) ->
|
|||
noreply(State);
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?LOGDEBUG("Info in queue: ~p~n", [Info]),
|
||||
{stop, {unhandled_info, Info}, State}.
|
||||
|
||||
handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
|
||||
|
|
|
|||
|
|
@ -354,8 +354,8 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
|
|||
|
||||
%% For bulk operations we lock the tables we are operating on in order
|
||||
%% to reduce the time complexity. Without the table locks we end up
|
||||
%% with num_tables*num_bulk_bindings row-level locks. Takiing each
|
||||
%% lock takes time proportional to the number of existing locks, thus
|
||||
%% with num_tables*num_bulk_bindings row-level locks. Taking each lock
|
||||
%% takes time proportional to the number of existing locks, thus
|
||||
%% resulting in O(num_bulk_bindings^2) complexity.
|
||||
%%
|
||||
%% The locks need to be write locks since ultimately we end up
|
||||
|
|
|
|||
|
|
@ -250,9 +250,10 @@ handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
|
|||
handle_call(_Request, _From, State) ->
|
||||
noreply(State).
|
||||
|
||||
handle_cast({method, Method, Content, Flow}, State = #ch{conn_pid = Conn}) ->
|
||||
handle_cast({method, Method, Content, Flow},
|
||||
State = #ch{reader_pid = Reader}) ->
|
||||
case Flow of
|
||||
flow -> credit_flow:ack(Conn);
|
||||
flow -> credit_flow:ack(Reader);
|
||||
noflow -> ok
|
||||
end,
|
||||
try handle_method(Method, Content, State) of
|
||||
|
|
@ -1084,9 +1085,9 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
|
|||
|
||||
handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
|
||||
uncommitted_acks = TAL}) ->
|
||||
ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ)),
|
||||
State1 = new_tx(State),
|
||||
{noreply, maybe_complete_tx(State1#ch{tx_status = committing})};
|
||||
State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
|
||||
ack(TAL, State1),
|
||||
{noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
|
||||
|
||||
handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
|
||||
rabbit_misc:protocol_error(
|
||||
|
|
|
|||
|
|
@ -28,8 +28,9 @@
|
|||
|
||||
-ifdef(use_specs).
|
||||
|
||||
-spec(start_link/1 :: (mfa()) -> rabbit_types:ok_pid_or_error()).
|
||||
-spec(start_link/2 :: ({'local', atom()}, mfa()) ->
|
||||
-spec(start_link/1 :: (rabbit_types:mfargs()) ->
|
||||
rabbit_types:ok_pid_or_error()).
|
||||
-spec(start_link/2 :: ({'local', atom()}, rabbit_types:mfargs()) ->
|
||||
rabbit_types:ok_pid_or_error()).
|
||||
|
||||
-endif.
|
||||
|
|
|
|||
|
|
@ -79,6 +79,12 @@ start() ->
|
|||
io:format(Format ++ " ...~n", Args1)
|
||||
end
|
||||
end,
|
||||
PrintInvalidCommandError =
|
||||
fun () ->
|
||||
print_error("invalid command '~s'",
|
||||
[string:join([atom_to_list(Command) | Args], " ")])
|
||||
end,
|
||||
|
||||
%% The reason we don't use a try/catch here is that rpc:call turns
|
||||
%% thrown errors into normal return values
|
||||
case catch action(Command, Node, Args, Opts, Inform) of
|
||||
|
|
@ -88,9 +94,11 @@ start() ->
|
|||
false -> io:format("...done.~n")
|
||||
end,
|
||||
rabbit_misc:quit(0);
|
||||
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
|
||||
print_error("invalid command '~s'",
|
||||
[string:join([atom_to_list(Command) | Args], " ")]),
|
||||
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> %% < R15
|
||||
PrintInvalidCommandError(),
|
||||
usage();
|
||||
{'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> %% >= R15
|
||||
PrintInvalidCommandError(),
|
||||
usage();
|
||||
{'EXIT', {badarg, _}} ->
|
||||
print_error("invalid parameter: ~p", [Args]),
|
||||
|
|
|
|||
|
|
@ -355,11 +355,21 @@ peek_serial(XName) ->
|
|||
_ -> undefined
|
||||
end.
|
||||
|
||||
invalid_module(T) ->
|
||||
rabbit_log:warning(
|
||||
"Could not find exchange type ~s.~n", [T]),
|
||||
put({xtype_to_module, T}, rabbit_exchange_type_invalid),
|
||||
rabbit_exchange_type_invalid.
|
||||
|
||||
%% Used with atoms from records; e.g., the type is expected to exist.
|
||||
type_to_module(T) ->
|
||||
case get({xtype_to_module, T}) of
|
||||
undefined -> {ok, Module} = rabbit_registry:lookup_module(exchange, T),
|
||||
put({xtype_to_module, T}, Module),
|
||||
Module;
|
||||
Module -> Module
|
||||
undefined ->
|
||||
case rabbit_registry:lookup_module(exchange, T) of
|
||||
{ok, Module} -> put({xtype_to_module, T}, Module),
|
||||
Module;
|
||||
{error, not_found} -> invalid_module(T)
|
||||
end;
|
||||
Module ->
|
||||
Module
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,47 @@
|
|||
%% The contents of this file are subject to the Mozilla Public License
|
||||
%% Version 1.1 (the "License"); you may not use this file except in
|
||||
%% compliance with the License. You may obtain a copy of the License
|
||||
%% at http://www.mozilla.org/MPL/
|
||||
%%
|
||||
%% Software distributed under the License is distributed on an "AS IS"
|
||||
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
||||
%% the License for the specific language governing rights and
|
||||
%% limitations under the License.
|
||||
%%
|
||||
%% The Original Code is RabbitMQ.
|
||||
%%
|
||||
%% The Initial Developer of the Original Code is VMware, Inc.
|
||||
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
|
||||
%%
|
||||
|
||||
-module(rabbit_exchange_type_invalid).
|
||||
-include("rabbit.hrl").
|
||||
|
||||
-behaviour(rabbit_exchange_type).
|
||||
|
||||
-export([description/0, serialise_events/0, route/2]).
|
||||
-export([validate/1, create/2, delete/3,
|
||||
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
|
||||
-include("rabbit_exchange_type_spec.hrl").
|
||||
|
||||
description() ->
|
||||
[{name, <<"invalid">>},
|
||||
{description,
|
||||
<<"Dummy exchange type, to be used when the intended one is not found.">>
|
||||
}].
|
||||
|
||||
serialise_events() -> false.
|
||||
|
||||
route(#exchange{name = Name, type = Type}, _) ->
|
||||
rabbit_misc:protocol_error(
|
||||
precondition_failed,
|
||||
"Cannot route message through ~s: exchange type ~s not found",
|
||||
[rabbit_misc:rs(Name), Type]).
|
||||
|
||||
validate(_X) -> ok.
|
||||
create(_Tx, _X) -> ok.
|
||||
delete(_Tx, _X, _Bs) -> ok.
|
||||
add_binding(_Tx, _X, _B) -> ok.
|
||||
remove_bindings(_Tx, _X, _Bs) -> ok.
|
||||
assert_args_equivalence(X, Args) ->
|
||||
rabbit_exchange:assert_args_equivalence(X, Args).
|
||||
|
|
@ -52,6 +52,7 @@ validate(_X) -> ok.
|
|||
create(_Tx, _X) -> ok.
|
||||
|
||||
delete(transaction, #exchange{name = X}, _Bs) ->
|
||||
trie_remove_all_nodes(X),
|
||||
trie_remove_all_edges(X),
|
||||
trie_remove_all_bindings(X),
|
||||
ok;
|
||||
|
|
@ -63,59 +64,26 @@ add_binding(transaction, _Exchange, Binding) ->
|
|||
add_binding(none, _Exchange, _Binding) ->
|
||||
ok.
|
||||
|
||||
remove_bindings(transaction, #exchange{name = X}, Bs) ->
|
||||
%% The remove process is split into two distinct phases. In the
|
||||
%% first phase we gather the lists of bindings and edges to
|
||||
%% delete, then in the second phase we process all the
|
||||
%% deletions. This is to prevent interleaving of read/write
|
||||
%% operations in mnesia that can adversely affect performance.
|
||||
{ToDelete, Paths} =
|
||||
lists:foldl(
|
||||
fun(#binding{source = S, key = K, destination = D}, {Acc, PathAcc}) ->
|
||||
Path = [{FinalNode, _} | _] =
|
||||
follow_down_get_path(S, split_topic_key(K)),
|
||||
{[{FinalNode, D} | Acc],
|
||||
decrement_bindings(X, Path, maybe_add_path(X, Path, PathAcc))}
|
||||
end, {[], gb_trees:empty()}, Bs),
|
||||
|
||||
[trie_remove_binding(X, FinalNode, D) || {FinalNode, D} <- ToDelete],
|
||||
[trie_remove_edge(X, Parent, Node, W) ||
|
||||
{Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)],
|
||||
remove_bindings(transaction, _X, Bs) ->
|
||||
%% See rabbit_binding:lock_route_tables for the rationale for
|
||||
%% taking table locks.
|
||||
case Bs of
|
||||
[_] -> ok;
|
||||
_ -> [mnesia:lock({table, T}, write) ||
|
||||
T <- [rabbit_topic_trie_node,
|
||||
rabbit_topic_trie_edge,
|
||||
rabbit_topic_trie_binding]]
|
||||
end,
|
||||
[begin
|
||||
Path = [{FinalNode, _} | _] =
|
||||
follow_down_get_path(X, split_topic_key(K)),
|
||||
trie_remove_binding(X, FinalNode, D),
|
||||
remove_path_if_empty(X, Path)
|
||||
end || #binding{source = X, key = K, destination = D} <- Bs],
|
||||
ok;
|
||||
remove_bindings(none, _X, _Bs) ->
|
||||
ok.
|
||||
|
||||
maybe_add_path(_X, [{root, none}], PathAcc) ->
|
||||
PathAcc;
|
||||
maybe_add_path(X, [{Node, W}, {Parent, _} | _], PathAcc) ->
|
||||
case gb_trees:is_defined(Node, PathAcc) of
|
||||
true -> PathAcc;
|
||||
false -> gb_trees:insert(Node, {Parent, W, {trie_binding_count(X, Node),
|
||||
trie_child_count(X, Node)}},
|
||||
PathAcc)
|
||||
end.
|
||||
|
||||
decrement_bindings(X, Path, PathAcc) ->
|
||||
with_path_acc(X, fun({Bindings, Edges}) -> {Bindings - 1, Edges} end,
|
||||
Path, PathAcc).
|
||||
|
||||
decrement_edges(X, Path, PathAcc) ->
|
||||
with_path_acc(X, fun({Bindings, Edges}) -> {Bindings, Edges - 1} end,
|
||||
Path, PathAcc).
|
||||
|
||||
with_path_acc(_X, _Fun, [{root, none}], PathAcc) ->
|
||||
PathAcc;
|
||||
with_path_acc(X, Fun, [{Node, _} | ParentPath], PathAcc) ->
|
||||
{Parent, W, Counts} = gb_trees:get(Node, PathAcc),
|
||||
NewCounts = Fun(Counts),
|
||||
NewPathAcc = gb_trees:update(Node, {Parent, W, NewCounts}, PathAcc),
|
||||
case NewCounts of
|
||||
{0, 0} -> decrement_edges(X, ParentPath,
|
||||
maybe_add_path(X, ParentPath, NewPathAcc));
|
||||
_ -> NewPathAcc
|
||||
end.
|
||||
|
||||
|
||||
assert_args_equivalence(X, Args) ->
|
||||
rabbit_exchange:assert_args_equivalence(X, Args).
|
||||
|
||||
|
|
@ -183,6 +151,16 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) ->
|
|||
error -> {error, Acc, Words}
|
||||
end.
|
||||
|
||||
remove_path_if_empty(_, [{root, none}]) ->
|
||||
ok;
|
||||
remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) ->
|
||||
case mnesia:read(rabbit_topic_trie_node,
|
||||
#trie_node{exchange_name = X, node_id = Node}, write) of
|
||||
[] -> trie_remove_edge(X, Parent, Node, W),
|
||||
remove_path_if_empty(X, RestPath);
|
||||
_ -> ok
|
||||
end.
|
||||
|
||||
trie_child(X, Node, Word) ->
|
||||
case mnesia:read({rabbit_topic_trie_edge,
|
||||
#trie_edge{exchange_name = X,
|
||||
|
|
@ -199,10 +177,30 @@ trie_bindings(X, Node) ->
|
|||
destination = '$1'}},
|
||||
mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
|
||||
|
||||
trie_update_node_counts(X, Node, Field, Delta) ->
|
||||
E = case mnesia:read(rabbit_topic_trie_node,
|
||||
#trie_node{exchange_name = X,
|
||||
node_id = Node}, write) of
|
||||
[] -> #topic_trie_node{trie_node = #trie_node{
|
||||
exchange_name = X,
|
||||
node_id = Node},
|
||||
edge_count = 0,
|
||||
binding_count = 0};
|
||||
[E0] -> E0
|
||||
end,
|
||||
case setelement(Field, E, element(Field, E) + Delta) of
|
||||
#topic_trie_node{edge_count = 0, binding_count = 0} ->
|
||||
ok = mnesia:delete_object(rabbit_topic_trie_node, E, write);
|
||||
EN ->
|
||||
ok = mnesia:write(rabbit_topic_trie_node, EN, write)
|
||||
end.
|
||||
|
||||
trie_add_edge(X, FromNode, ToNode, W) ->
|
||||
trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, +1),
|
||||
trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3).
|
||||
|
||||
trie_remove_edge(X, FromNode, ToNode, W) ->
|
||||
trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, -1),
|
||||
trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3).
|
||||
|
||||
trie_edge_op(X, FromNode, ToNode, W, Op) ->
|
||||
|
|
@ -214,9 +212,11 @@ trie_edge_op(X, FromNode, ToNode, W, Op) ->
|
|||
write).
|
||||
|
||||
trie_add_binding(X, Node, D) ->
|
||||
trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1),
|
||||
trie_binding_op(X, Node, D, fun mnesia:write/3).
|
||||
|
||||
trie_remove_binding(X, Node, D) ->
|
||||
trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1),
|
||||
trie_binding_op(X, Node, D, fun mnesia:delete_object/3).
|
||||
|
||||
trie_binding_op(X, Node, D, Op) ->
|
||||
|
|
@ -227,23 +227,11 @@ trie_binding_op(X, Node, D, Op) ->
|
|||
destination = D}},
|
||||
write).
|
||||
|
||||
trie_child_count(X, Node) ->
|
||||
count(rabbit_topic_trie_edge,
|
||||
#topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
|
||||
node_id = Node,
|
||||
_ = '_'},
|
||||
_ = '_'}).
|
||||
|
||||
trie_binding_count(X, Node) ->
|
||||
count(rabbit_topic_trie_binding,
|
||||
#topic_trie_binding{
|
||||
trie_binding = #trie_binding{exchange_name = X,
|
||||
node_id = Node,
|
||||
_ = '_'},
|
||||
_ = '_'}).
|
||||
|
||||
count(Table, Match) ->
|
||||
length(mnesia:match_object(Table, Match, read)).
|
||||
trie_remove_all_nodes(X) ->
|
||||
remove_all(rabbit_topic_trie_node,
|
||||
#topic_trie_node{trie_node = #trie_node{exchange_name = X,
|
||||
_ = '_'},
|
||||
_ = '_'}).
|
||||
|
||||
trie_remove_all_edges(X) ->
|
||||
remove_all(rabbit_topic_trie_edge,
|
||||
|
|
|
|||
|
|
@ -23,8 +23,7 @@
|
|||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
|
||||
-export([debug/1, debug/2, message/4, info/1, info/2,
|
||||
warning/1, warning/2, error/1, error/2]).
|
||||
-export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
|
|
@ -32,9 +31,15 @@
|
|||
|
||||
-ifdef(use_specs).
|
||||
|
||||
-export_type([level/0]).
|
||||
|
||||
-type(category() :: atom()).
|
||||
-type(level() :: 'info' | 'warning' | 'error').
|
||||
|
||||
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
|
||||
-spec(debug/1 :: (string()) -> 'ok').
|
||||
-spec(debug/2 :: (string(), [any()]) -> 'ok').
|
||||
|
||||
-spec(log/3 :: (category(), level(), string()) -> 'ok').
|
||||
-spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok').
|
||||
-spec(info/1 :: (string()) -> 'ok').
|
||||
-spec(info/2 :: (string(), [any()]) -> 'ok').
|
||||
-spec(warning/1 :: (string()) -> 'ok').
|
||||
|
|
@ -42,84 +47,47 @@
|
|||
-spec(error/1 :: (string()) -> 'ok').
|
||||
-spec(error/2 :: (string(), [any()]) -> 'ok').
|
||||
|
||||
-spec(message/4 :: (_,_,_,_) -> 'ok').
|
||||
|
||||
-endif.
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
log(Category, Level, Fmt) -> log(Category, Level, Fmt, []).
|
||||
|
||||
debug(Fmt) ->
|
||||
gen_server:cast(?SERVER, {debug, Fmt}).
|
||||
log(Category, Level, Fmt, Args) when is_list(Args) ->
|
||||
gen_server:cast(?SERVER, {log, Category, Level, Fmt, Args}).
|
||||
|
||||
debug(Fmt, Args) when is_list(Args) ->
|
||||
gen_server:cast(?SERVER, {debug, Fmt, Args}).
|
||||
|
||||
message(Direction, Channel, MethodRecord, Content) ->
|
||||
gen_server:cast(?SERVER,
|
||||
{message, Direction, Channel, MethodRecord, Content}).
|
||||
|
||||
info(Fmt) ->
|
||||
gen_server:cast(?SERVER, {info, Fmt}).
|
||||
|
||||
info(Fmt, Args) when is_list(Args) ->
|
||||
gen_server:cast(?SERVER, {info, Fmt, Args}).
|
||||
|
||||
warning(Fmt) ->
|
||||
gen_server:cast(?SERVER, {warning, Fmt}).
|
||||
|
||||
warning(Fmt, Args) when is_list(Args) ->
|
||||
gen_server:cast(?SERVER, {warning, Fmt, Args}).
|
||||
|
||||
error(Fmt) ->
|
||||
gen_server:cast(?SERVER, {error, Fmt}).
|
||||
|
||||
error(Fmt, Args) when is_list(Args) ->
|
||||
gen_server:cast(?SERVER, {error, Fmt, Args}).
|
||||
info(Fmt) -> log(default, info, Fmt).
|
||||
info(Fmt, Args) -> log(default, info, Fmt, Args).
|
||||
warning(Fmt) -> log(default, warning, Fmt).
|
||||
warning(Fmt, Args) -> log(default, warning, Fmt, Args).
|
||||
error(Fmt) -> log(default, error, Fmt).
|
||||
error(Fmt, Args) -> log(default, error, Fmt, Args).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
init([]) -> {ok, none}.
|
||||
init([]) ->
|
||||
{ok, CatLevelList} = application:get_env(log_levels),
|
||||
CatLevels = [{Cat, level(Level)} || {Cat, Level} <- CatLevelList],
|
||||
{ok, orddict:from_list(CatLevels)}.
|
||||
|
||||
handle_call(_Request, _From, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_cast({debug, Fmt}, State) ->
|
||||
io:format("debug:: "), io:format(Fmt),
|
||||
error_logger:info_msg("debug:: " ++ Fmt),
|
||||
{noreply, State};
|
||||
handle_cast({debug, Fmt, Args}, State) ->
|
||||
io:format("debug:: "), io:format(Fmt, Args),
|
||||
error_logger:info_msg("debug:: " ++ Fmt, Args),
|
||||
{noreply, State};
|
||||
handle_cast({message, Direction, Channel, MethodRecord, Content}, State) ->
|
||||
io:format("~s ch~p ~p~n",
|
||||
[case Direction of
|
||||
in -> "-->";
|
||||
out -> "<--" end,
|
||||
Channel,
|
||||
{MethodRecord, Content}]),
|
||||
{noreply, State};
|
||||
handle_cast({info, Fmt}, State) ->
|
||||
error_logger:info_msg(Fmt),
|
||||
{noreply, State};
|
||||
handle_cast({info, Fmt, Args}, State) ->
|
||||
error_logger:info_msg(Fmt, Args),
|
||||
{noreply, State};
|
||||
handle_cast({warning, Fmt}, State) ->
|
||||
error_logger:warning_msg(Fmt),
|
||||
{noreply, State};
|
||||
handle_cast({warning, Fmt, Args}, State) ->
|
||||
error_logger:warning_msg(Fmt, Args),
|
||||
{noreply, State};
|
||||
handle_cast({error, Fmt}, State) ->
|
||||
error_logger:error_msg(Fmt),
|
||||
{noreply, State};
|
||||
handle_cast({error, Fmt, Args}, State) ->
|
||||
error_logger:error_msg(Fmt, Args),
|
||||
{noreply, State};
|
||||
handle_cast({log, Category, Level, Fmt, Args}, CatLevels) ->
|
||||
CatLevel = case orddict:find(Category, CatLevels) of
|
||||
{ok, L} -> L;
|
||||
error -> level(info)
|
||||
end,
|
||||
case level(Level) =< CatLevel of
|
||||
false -> ok;
|
||||
true -> (case Level of
|
||||
info -> fun error_logger:info_msg/2;
|
||||
warning -> fun error_logger:warning_msg/2;
|
||||
error -> fun error_logger:error_msg/2
|
||||
end)(Fmt, Args)
|
||||
end,
|
||||
{noreply, CatLevels};
|
||||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
|
|
@ -132,3 +100,9 @@ terminate(_Reason, _State) ->
|
|||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
level(info) -> 3;
|
||||
level(warning) -> 2;
|
||||
level(error) -> 1;
|
||||
level(none) -> 0.
|
||||
|
|
|
|||
|
|
@ -325,8 +325,7 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) ->
|
|||
true = link(GM),
|
||||
GM
|
||||
end,
|
||||
{ok, _TRef} =
|
||||
timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]),
|
||||
ensure_gm_heartbeat(),
|
||||
{ok, #state { q = Q,
|
||||
gm = GM1,
|
||||
monitors = dict:new(),
|
||||
|
|
@ -366,6 +365,11 @@ handle_cast({ensure_monitoring, Pids},
|
|||
end, Monitors, Pids),
|
||||
noreply(State #state { monitors = Monitors1 }).
|
||||
|
||||
handle_info(send_gm_heartbeat, State = #state{gm = GM}) ->
|
||||
gm:broadcast(GM, heartbeat),
|
||||
ensure_gm_heartbeat(),
|
||||
noreply(State);
|
||||
|
||||
handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
|
||||
State = #state { monitors = Monitors,
|
||||
death_fun = DeathFun }) ->
|
||||
|
|
@ -419,3 +423,6 @@ noreply(State) ->
|
|||
|
||||
reply(Reply, State) ->
|
||||
{reply, Reply, State, hibernate}.
|
||||
|
||||
ensure_gm_heartbeat() ->
|
||||
erlang:send_after(?ONE_SECOND, self(), send_gm_heartbeat).
|
||||
|
|
|
|||
|
|
@ -137,7 +137,7 @@ add_mirror(Queue, MirrorNode) ->
|
|||
[] -> Result = rabbit_mirror_queue_slave_sup:start_child(
|
||||
MirrorNode, [Q]),
|
||||
rabbit_log:info(
|
||||
"Adding mirror of queue ~s on node ~p: ~p~n",
|
||||
"Adding mirror of ~s on node ~p: ~p~n",
|
||||
[rabbit_misc:rs(Name), MirrorNode, Result]),
|
||||
case Result of
|
||||
{ok, _Pid} -> ok;
|
||||
|
|
|
|||
|
|
@ -268,6 +268,11 @@ table_definitions() ->
|
|||
{type, ordered_set},
|
||||
{match, #reverse_route{reverse_binding = reverse_binding_match(),
|
||||
_='_'}}]},
|
||||
{rabbit_topic_trie_node,
|
||||
[{record_name, topic_trie_node},
|
||||
{attributes, record_info(fields, topic_trie_node)},
|
||||
{type, ordered_set},
|
||||
{match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]},
|
||||
{rabbit_topic_trie_edge,
|
||||
[{record_name, topic_trie_edge},
|
||||
{attributes, record_info(fields, topic_trie_edge)},
|
||||
|
|
@ -314,12 +319,12 @@ reverse_binding_match() ->
|
|||
_='_'}.
|
||||
binding_destination_match() ->
|
||||
resource_match('_').
|
||||
trie_node_match() ->
|
||||
#trie_node{ exchange_name = exchange_name_match(), _='_'}.
|
||||
trie_edge_match() ->
|
||||
#trie_edge{exchange_name = exchange_name_match(),
|
||||
_='_'}.
|
||||
#trie_edge{ exchange_name = exchange_name_match(), _='_'}.
|
||||
trie_binding_match() ->
|
||||
#trie_binding{exchange_name = exchange_name_match(),
|
||||
_='_'}.
|
||||
#trie_binding{exchange_name = exchange_name_match(), _='_'}.
|
||||
exchange_name_match() ->
|
||||
resource_match(exchange).
|
||||
queue_name_match() ->
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
|
||||
recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1,
|
||||
sockname/1, peername/1, peercert/1]).
|
||||
sockname/1, peername/1, peercert/1, connection_string/2]).
|
||||
|
||||
%%---------------------------------------------------------------------------
|
||||
|
||||
|
|
@ -62,6 +62,8 @@
|
|||
-spec(peercert/1 ::
|
||||
(socket())
|
||||
-> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())).
|
||||
-spec(connection_string/2 ::
|
||||
(socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())).
|
||||
|
||||
-endif.
|
||||
|
||||
|
|
@ -141,3 +143,20 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock).
|
|||
|
||||
peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl);
|
||||
peercert(Sock) when is_port(Sock) -> nossl.
|
||||
|
||||
connection_string(Sock, Direction) ->
|
||||
{From, To} = case Direction of
|
||||
inbound -> {fun peername/1, fun sockname/1};
|
||||
outbound -> {fun sockname/1, fun peername/1}
|
||||
end,
|
||||
case {From(Sock), To(Sock)} of
|
||||
{{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} ->
|
||||
{ok, lists:flatten(
|
||||
io_lib:format("~s:~p -> ~s:~p",
|
||||
[rabbit_misc:ntoab(FromAddress), FromPort,
|
||||
rabbit_misc:ntoab(ToAddress), ToPort]))};
|
||||
{{error, _Reason} = Error, _} ->
|
||||
Error;
|
||||
{_, {error, _Reason} = Error} ->
|
||||
Error
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@
|
|||
close_connection/2, force_connection_event_refresh/0]).
|
||||
|
||||
%%used by TCP-based transports, e.g. STOMP adapter
|
||||
-export([check_tcp_listener_address/2,
|
||||
-export([tcp_listener_addresses/1, tcp_listener_spec/6,
|
||||
ensure_ssl/0, ssl_transform_fun/1]).
|
||||
|
||||
-export([tcp_listener_started/3, tcp_listener_stopped/3,
|
||||
|
|
@ -47,12 +47,16 @@
|
|||
-export_type([ip_port/0, hostname/0]).
|
||||
|
||||
-type(hostname() :: inet:hostname()).
|
||||
-type(ip_port() :: inet:ip_port()).
|
||||
-type(ip_port() :: inet:port_number()).
|
||||
|
||||
-type(family() :: atom()).
|
||||
-type(listener_config() :: ip_port() |
|
||||
{hostname(), ip_port()} |
|
||||
{hostname(), ip_port(), family()}).
|
||||
-type(address() :: {inet:ip_address(), ip_port(), family()}).
|
||||
-type(name_prefix() :: atom()).
|
||||
-type(protocol() :: atom()).
|
||||
-type(label() :: string()).
|
||||
|
||||
-spec(start/0 :: () -> 'ok').
|
||||
-spec(start_tcp_listener/1 :: (listener_config()) -> 'ok').
|
||||
|
|
@ -76,8 +80,10 @@
|
|||
-spec(force_connection_event_refresh/0 :: () -> 'ok').
|
||||
|
||||
-spec(on_node_down/1 :: (node()) -> 'ok').
|
||||
-spec(check_tcp_listener_address/2 :: (atom(), listener_config())
|
||||
-> [{inet:ip_address(), ip_port(), family(), atom()}]).
|
||||
-spec(tcp_listener_addresses/1 :: (listener_config()) -> [address()]).
|
||||
-spec(tcp_listener_spec/6 ::
|
||||
(name_prefix(), address(), [gen_tcp:listen_option()], protocol(),
|
||||
label(), rabbit_types:mfargs()) -> supervisor:child_spec()).
|
||||
-spec(ensure_ssl/0 :: () -> rabbit_types:infos()).
|
||||
-spec(ssl_transform_fun/1 ::
|
||||
(rabbit_types:infos())
|
||||
|
|
@ -140,39 +146,6 @@ start() ->
|
|||
transient, infinity, supervisor, [rabbit_client_sup]}),
|
||||
ok.
|
||||
|
||||
%% inet_parse:address takes care of ip string, like "0.0.0.0"
|
||||
%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
|
||||
%% and runs 'inet_gethost' port process for dns lookups.
|
||||
%% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
|
||||
|
||||
getaddr(Host, Family) ->
|
||||
case inet_parse:address(Host) of
|
||||
{ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}];
|
||||
{error, _} -> gethostaddr(Host, Family)
|
||||
end.
|
||||
|
||||
gethostaddr(Host, auto) ->
|
||||
Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]],
|
||||
case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of
|
||||
[] -> host_lookup_error(Host, Lookups);
|
||||
IPs -> IPs
|
||||
end;
|
||||
|
||||
gethostaddr(Host, Family) ->
|
||||
case inet:getaddr(Host, Family) of
|
||||
{ok, IPAddress} -> [{IPAddress, Family}];
|
||||
{error, Reason} -> host_lookup_error(Host, Reason)
|
||||
end.
|
||||
|
||||
host_lookup_error(Host, Reason) ->
|
||||
error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]),
|
||||
throw({error, {invalid_host, Host, Reason}}).
|
||||
|
||||
resolve_family({_,_,_,_}, auto) -> inet;
|
||||
resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6;
|
||||
resolve_family(IP, auto) -> throw({error, {strange_family, IP}});
|
||||
resolve_family(_, F) -> F.
|
||||
|
||||
ensure_ssl() ->
|
||||
ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
|
||||
{ok, SslOptsConfig} = application:get_env(rabbit, ssl_options),
|
||||
|
|
@ -191,8 +164,6 @@ ssl_transform_fun(SslOpts) ->
|
|||
fun (Sock) ->
|
||||
case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of
|
||||
{ok, SslSock} ->
|
||||
rabbit_log:info("upgraded TCP connection ~p to SSL~n",
|
||||
[self()]),
|
||||
{ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
|
||||
{error, Reason} ->
|
||||
{error, {ssl_upgrade_error, Reason}};
|
||||
|
|
@ -201,31 +172,36 @@ ssl_transform_fun(SslOpts) ->
|
|||
end
|
||||
end.
|
||||
|
||||
check_tcp_listener_address(NamePrefix, Port) when is_integer(Port) ->
|
||||
check_tcp_listener_address_auto(NamePrefix, Port);
|
||||
|
||||
check_tcp_listener_address(NamePrefix, {"auto", Port}) ->
|
||||
tcp_listener_addresses(Port) when is_integer(Port) ->
|
||||
tcp_listener_addresses_auto(Port);
|
||||
tcp_listener_addresses({"auto", Port}) ->
|
||||
%% Variant to prevent lots of hacking around in bash and batch files
|
||||
check_tcp_listener_address_auto(NamePrefix, Port);
|
||||
|
||||
check_tcp_listener_address(NamePrefix, {Host, Port}) ->
|
||||
tcp_listener_addresses_auto(Port);
|
||||
tcp_listener_addresses({Host, Port}) ->
|
||||
%% auto: determine family IPv4 / IPv6 after converting to IP address
|
||||
check_tcp_listener_address(NamePrefix, {Host, Port, auto});
|
||||
tcp_listener_addresses({Host, Port, auto});
|
||||
tcp_listener_addresses({Host, Port, Family0})
|
||||
when is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) ->
|
||||
[{IPAddress, Port, Family} ||
|
||||
{IPAddress, Family} <- getaddr(Host, Family0)];
|
||||
tcp_listener_addresses({_Host, Port, _Family0}) ->
|
||||
error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]),
|
||||
throw({error, {invalid_port, Port}}).
|
||||
|
||||
check_tcp_listener_address(NamePrefix, {Host, Port, Family0}) ->
|
||||
if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok;
|
||||
true -> error_logger:error_msg("invalid port ~p - not 0..65535~n",
|
||||
[Port]),
|
||||
throw({error, {invalid_port, Port}})
|
||||
end,
|
||||
[{IPAddress, Port, Family,
|
||||
rabbit_misc:tcp_name(NamePrefix, IPAddress, Port)} ||
|
||||
{IPAddress, Family} <- getaddr(Host, Family0)].
|
||||
|
||||
check_tcp_listener_address_auto(NamePrefix, Port) ->
|
||||
lists:append([check_tcp_listener_address(NamePrefix, Listener) ||
|
||||
tcp_listener_addresses_auto(Port) ->
|
||||
lists:append([tcp_listener_addresses(Listener) ||
|
||||
Listener <- port_to_listeners(Port)]).
|
||||
|
||||
tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts,
|
||||
Protocol, Label, OnConnect) ->
|
||||
{rabbit_misc:tcp_name(NamePrefix, IPAddress, Port),
|
||||
{tcp_listener_sup, start_link,
|
||||
[IPAddress, Port, [Family | SocketOpts],
|
||||
{?MODULE, tcp_listener_started, [Protocol]},
|
||||
{?MODULE, tcp_listener_stopped, [Protocol]},
|
||||
OnConnect, Label]},
|
||||
transient, infinity, supervisor, [tcp_listener_sup]}.
|
||||
|
||||
start_tcp_listener(Listener) ->
|
||||
start_listener(Listener, amqp, "TCP Listener",
|
||||
{?MODULE, start_client, []}).
|
||||
|
|
@ -235,27 +211,26 @@ start_ssl_listener(Listener, SslOpts) ->
|
|||
{?MODULE, start_ssl_client, [SslOpts]}).
|
||||
|
||||
start_listener(Listener, Protocol, Label, OnConnect) ->
|
||||
[start_listener0(Spec, Protocol, Label, OnConnect) ||
|
||||
Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)],
|
||||
[start_listener0(Address, Protocol, Label, OnConnect) ||
|
||||
Address <- tcp_listener_addresses(Listener)],
|
||||
ok.
|
||||
|
||||
start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) ->
|
||||
{ok,_} = supervisor:start_child(
|
||||
rabbit_sup,
|
||||
{Name,
|
||||
{tcp_listener_sup, start_link,
|
||||
[IPAddress, Port, [Family | tcp_opts()],
|
||||
{?MODULE, tcp_listener_started, [Protocol]},
|
||||
{?MODULE, tcp_listener_stopped, [Protocol]},
|
||||
OnConnect, Label]},
|
||||
transient, infinity, supervisor, [tcp_listener_sup]}).
|
||||
start_listener0(Address, Protocol, Label, OnConnect) ->
|
||||
Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, tcp_opts(),
|
||||
Protocol, Label, OnConnect),
|
||||
case supervisor:start_child(rabbit_sup, Spec) of
|
||||
{ok, _} -> ok;
|
||||
{error, {shutdown, _}} -> {IPAddress, Port, _Family} = Address,
|
||||
exit({could_not_start_tcp_listener,
|
||||
{rabbit_misc:ntoa(IPAddress), Port}})
|
||||
end.
|
||||
|
||||
stop_tcp_listener(Listener) ->
|
||||
[stop_tcp_listener0(Spec) ||
|
||||
Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)],
|
||||
[stop_tcp_listener0(Address) ||
|
||||
Address <- tcp_listener_addresses(Listener)],
|
||||
ok.
|
||||
|
||||
stop_tcp_listener0({IPAddress, Port, _Family, Name}) ->
|
||||
stop_tcp_listener0({IPAddress, Port, _Family}) ->
|
||||
Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port),
|
||||
ok = supervisor:terminate_child(rabbit_sup, Name),
|
||||
ok = supervisor:delete_child(rabbit_sup, Name).
|
||||
|
|
@ -294,6 +269,16 @@ start_client(Sock, SockTransform) ->
|
|||
{ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []),
|
||||
ok = rabbit_net:controlling_process(Sock, Reader),
|
||||
Reader ! {go, Sock, SockTransform},
|
||||
|
||||
%% In the event that somebody floods us with connections, the
|
||||
%% reader processes can spew log events at error_logger faster
|
||||
%% than it can keep up, causing its mailbox to grow unbounded
|
||||
%% until we eat all the memory available and crash. So here is a
|
||||
%% meaningless synchronous call to the underlying gen_event
|
||||
%% mechanism. When it returns the mailbox is drained, and we
|
||||
%% return to our caller to accept more connetions.
|
||||
gen_event:which_handlers(error_logger),
|
||||
|
||||
Reader.
|
||||
|
||||
start_client(Sock) ->
|
||||
|
|
@ -363,6 +348,38 @@ tcp_opts() ->
|
|||
{ok, Opts} = application:get_env(rabbit, tcp_listen_options),
|
||||
Opts.
|
||||
|
||||
%% inet_parse:address takes care of ip string, like "0.0.0.0"
|
||||
%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
|
||||
%% and runs 'inet_gethost' port process for dns lookups.
|
||||
%% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
|
||||
getaddr(Host, Family) ->
|
||||
case inet_parse:address(Host) of
|
||||
{ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}];
|
||||
{error, _} -> gethostaddr(Host, Family)
|
||||
end.
|
||||
|
||||
gethostaddr(Host, auto) ->
|
||||
Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]],
|
||||
case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of
|
||||
[] -> host_lookup_error(Host, Lookups);
|
||||
IPs -> IPs
|
||||
end;
|
||||
|
||||
gethostaddr(Host, Family) ->
|
||||
case inet:getaddr(Host, Family) of
|
||||
{ok, IPAddress} -> [{IPAddress, Family}];
|
||||
{error, Reason} -> host_lookup_error(Host, Reason)
|
||||
end.
|
||||
|
||||
host_lookup_error(Host, Reason) ->
|
||||
error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]),
|
||||
throw({error, {invalid_host, Host, Reason}}).
|
||||
|
||||
resolve_family({_,_,_,_}, auto) -> inet;
|
||||
resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6;
|
||||
resolve_family(IP, auto) -> throw({error, {strange_family, IP}});
|
||||
resolve_family(_, F) -> F.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% There are three kinds of machine (for our purposes).
|
||||
|
|
|
|||
|
|
@ -55,13 +55,20 @@ start() ->
|
|||
CmdArgsAndOpts -> CmdArgsAndOpts
|
||||
end,
|
||||
Command = list_to_atom(Command0),
|
||||
PrintInvalidCommandError =
|
||||
fun () ->
|
||||
print_error("invalid command '~s'",
|
||||
[string:join([atom_to_list(Command) | Args], " ")])
|
||||
end,
|
||||
|
||||
case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of
|
||||
ok ->
|
||||
rabbit_misc:quit(0);
|
||||
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
|
||||
print_error("invalid command '~s'",
|
||||
[string:join([atom_to_list(Command) | Args], " ")]),
|
||||
PrintInvalidCommandError(),
|
||||
usage();
|
||||
{'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} ->
|
||||
PrintInvalidCommandError(),
|
||||
usage();
|
||||
{error, Reason} ->
|
||||
print_error("~p", [Reason]),
|
||||
|
|
@ -325,6 +332,9 @@ lookup_plugins(Names, AllPlugins) ->
|
|||
read_enabled_plugins(PluginsFile) ->
|
||||
case rabbit_file:read_term_file(PluginsFile) of
|
||||
{ok, [Plugins]} -> Plugins;
|
||||
{ok, []} -> [];
|
||||
{ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file,
|
||||
PluginsFile}});
|
||||
{error, enoent} -> [];
|
||||
{error, Reason} -> throw({error, {cannot_read_enabled_plugins_file,
|
||||
PluginsFile, Reason}})
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@
|
|||
|
||||
-define(HANDSHAKE_TIMEOUT, 10).
|
||||
-define(NORMAL_TIMEOUT, 3).
|
||||
-define(CLOSING_TIMEOUT, 1).
|
||||
-define(CLOSING_TIMEOUT, 30).
|
||||
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
|
||||
-define(SILENT_CLOSE_DELAY, 3).
|
||||
|
||||
|
|
@ -173,25 +173,26 @@ server_capabilities(rabbit_framing_amqp_0_9_1) ->
|
|||
server_capabilities(_) ->
|
||||
[].
|
||||
|
||||
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
|
||||
|
||||
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
|
||||
|
||||
socket_op(Sock, Fun) ->
|
||||
case Fun(Sock) of
|
||||
{ok, Res} -> Res;
|
||||
{error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n",
|
||||
[self(), Reason]),
|
||||
rabbit_log:info("closing TCP connection ~p~n",
|
||||
[self()]),
|
||||
{error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n",
|
||||
[self(), Reason]),
|
||||
exit(normal)
|
||||
end.
|
||||
|
||||
start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
|
||||
Sock, SockTransform) ->
|
||||
process_flag(trap_exit, true),
|
||||
{PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
|
||||
PeerAddressS = rabbit_misc:ntoab(PeerAddress),
|
||||
rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
|
||||
[self(), PeerAddressS, PeerPort]),
|
||||
ConnStr = socket_op(Sock, fun (Sock0) ->
|
||||
rabbit_net:connection_string(
|
||||
Sock0, inbound)
|
||||
end),
|
||||
log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]),
|
||||
ClientSock = socket_op(Sock, SockTransform),
|
||||
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
|
||||
handshake_timeout),
|
||||
|
|
@ -223,17 +224,15 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
|
|||
try
|
||||
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
|
||||
State, #v1.stats_timer),
|
||||
handshake, 8))
|
||||
handshake, 8)),
|
||||
log(info, "closing AMQP connection ~p (~s)~n", [self(), ConnStr])
|
||||
catch
|
||||
Ex -> (if Ex == connection_closed_abruptly ->
|
||||
fun rabbit_log:warning/2;
|
||||
true ->
|
||||
fun rabbit_log:error/2
|
||||
end)("exception on TCP connection ~p from ~s:~p~n~p~n",
|
||||
[self(), PeerAddressS, PeerPort, Ex])
|
||||
Ex -> log(case Ex of
|
||||
connection_closed_abruptly -> warning;
|
||||
_ -> error
|
||||
end, "closing AMQP connection ~p (~s):~n~p~n",
|
||||
[self(), ConnStr, Ex])
|
||||
after
|
||||
rabbit_log:info("closing TCP connection ~p from ~s:~p~n",
|
||||
[self(), PeerAddressS, PeerPort]),
|
||||
%% We don't close the socket explicitly. The reader is the
|
||||
%% controlling process and hence its termination will close
|
||||
%% the socket. Furthermore, gen_tcp:close/1 waits for pending
|
||||
|
|
@ -404,8 +403,8 @@ handle_dependent_exit(ChPid, Reason, State) ->
|
|||
{_Channel, controlled} ->
|
||||
maybe_close(control_throttle(State));
|
||||
{Channel, uncontrolled} ->
|
||||
rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
|
||||
[self(), Channel, Reason]),
|
||||
log(error, "AMQP connection ~p, channel ~p - error:~n~p~n",
|
||||
[self(), Channel, Reason]),
|
||||
maybe_close(handle_exception(control_throttle(State),
|
||||
Channel, Reason))
|
||||
end.
|
||||
|
|
@ -449,9 +448,10 @@ wait_for_channel_termination(N, TimerRef) ->
|
|||
{_Channel, controlled} ->
|
||||
wait_for_channel_termination(N-1, TimerRef);
|
||||
{Channel, uncontrolled} ->
|
||||
rabbit_log:error("connection ~p, channel ~p - "
|
||||
"error while terminating:~n~p~n",
|
||||
[self(), Channel, Reason]),
|
||||
log(error,
|
||||
"AMQP connection ~p, channel ~p - "
|
||||
"error while terminating:~n~p~n",
|
||||
[self(), Channel, Reason]),
|
||||
wait_for_channel_termination(N-1, TimerRef)
|
||||
end;
|
||||
cancel_wait ->
|
||||
|
|
|
|||
|
|
@ -28,7 +28,8 @@
|
|||
|
||||
-ifdef(use_specs).
|
||||
|
||||
-spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()).
|
||||
-spec(start_link/2 :: (atom(), rabbit_types:mfargs()) ->
|
||||
rabbit_types:ok_pid_or_error()).
|
||||
|
||||
-endif.
|
||||
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@
|
|||
-include_lib("public_key/include/public_key.hrl").
|
||||
|
||||
-export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]).
|
||||
-export([peer_cert_subject_item/2]).
|
||||
-export([peer_cert_subject_items/2]).
|
||||
|
||||
%%--------------------------------------------------------------------------
|
||||
|
||||
|
|
@ -34,8 +34,8 @@
|
|||
-spec(peer_cert_issuer/1 :: (certificate()) -> string()).
|
||||
-spec(peer_cert_subject/1 :: (certificate()) -> string()).
|
||||
-spec(peer_cert_validity/1 :: (certificate()) -> string()).
|
||||
-spec(peer_cert_subject_item/2 ::
|
||||
(certificate(), tuple()) -> string() | 'not_found').
|
||||
-spec(peer_cert_subject_items/2 ::
|
||||
(certificate(), tuple()) -> [string()] | 'not_found').
|
||||
|
||||
-endif.
|
||||
|
||||
|
|
@ -59,8 +59,8 @@ peer_cert_subject(Cert) ->
|
|||
format_rdn_sequence(Subject)
|
||||
end, Cert).
|
||||
|
||||
%% Return a part of the certificate's subject.
|
||||
peer_cert_subject_item(Cert, Type) ->
|
||||
%% Return the parts of the certificate's subject.
|
||||
peer_cert_subject_items(Cert, Type) ->
|
||||
cert_info(fun(#'OTPCertificate' {
|
||||
tbsCertificate = #'OTPTBSCertificate' {
|
||||
subject = Subject }}) ->
|
||||
|
|
@ -89,8 +89,8 @@ find_by_type(Type, {rdnSequence, RDNs}) ->
|
|||
case [V || #'AttributeTypeAndValue'{type = T, value = V}
|
||||
<- lists:flatten(RDNs),
|
||||
T == Type] of
|
||||
[Val] -> format_asn1_value(Val);
|
||||
[] -> not_found
|
||||
[] -> not_found;
|
||||
L -> [format_asn1_value(V) || V <- L]
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------------
|
||||
|
|
@ -150,9 +150,11 @@ escape_rdn_value([$ ], middle) ->
|
|||
escape_rdn_value([C | S], middle) when C =:= $"; C =:= $+; C =:= $,; C =:= $;;
|
||||
C =:= $<; C =:= $>; C =:= $\\ ->
|
||||
[$\\, C | escape_rdn_value(S, middle)];
|
||||
escape_rdn_value([C | S], middle) when C < 32 ; C =:= 127 ->
|
||||
%% only U+0000 needs escaping, but for display purposes it's handy
|
||||
%% to escape all non-printable chars
|
||||
escape_rdn_value([C | S], middle) when C < 32 ; C >= 126 ->
|
||||
%% Of ASCII characters only U+0000 needs escaping, but for display
|
||||
%% purposes it's handy to escape all non-printable chars. All non-ASCII
|
||||
%% characters get converted to UTF-8 sequences and then escaped. We've
|
||||
%% already got a UTF-8 sequence here, so just escape it.
|
||||
lists:flatten(io_lib:format("\\~2.16.0B", [C])) ++
|
||||
escape_rdn_value(S, middle);
|
||||
escape_rdn_value([C | S], middle) ->
|
||||
|
|
@ -167,6 +169,10 @@ format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2,
|
|||
Min1, Min2, S1, S2, $Z]}) ->
|
||||
io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ",
|
||||
[Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]);
|
||||
%% We appear to get an untagged value back for an ia5string
|
||||
%% (e.g. domainComponent).
|
||||
format_asn1_value(V) when is_list(V) ->
|
||||
V;
|
||||
format_asn1_value(V) ->
|
||||
io_lib:format("~p", [V]).
|
||||
|
||||
|
|
|
|||
|
|
@ -28,12 +28,9 @@
|
|||
binding/0, binding_source/0, binding_destination/0,
|
||||
amqqueue/0, exchange/0,
|
||||
connection/0, protocol/0, user/0, internal_user/0,
|
||||
username/0, password/0, password_hash/0, ok/1, error/1,
|
||||
ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0,
|
||||
connection_exit/0]).
|
||||
|
||||
-type(channel_exit() :: no_return()).
|
||||
-type(connection_exit() :: no_return()).
|
||||
username/0, password/0, password_hash/0,
|
||||
ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0,
|
||||
channel_exit/0, connection_exit/0, mfargs/0]).
|
||||
|
||||
-type(maybe(T) :: T | 'none').
|
||||
-type(vhost() :: binary()).
|
||||
|
|
@ -156,4 +153,9 @@
|
|||
-type(ok_or_error2(A, B) :: ok(A) | error(B)).
|
||||
-type(ok_pid_or_error() :: ok_or_error2(pid(), any())).
|
||||
|
||||
-type(channel_exit() :: no_return()).
|
||||
-type(connection_exit() :: no_return()).
|
||||
|
||||
-type(mfargs() :: {atom(), atom(), [any()]}).
|
||||
|
||||
-endif. % use_specs
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@
|
|||
-rabbit_upgrade({gm, mnesia, []}).
|
||||
-rabbit_upgrade({exchange_scratch, mnesia, [trace_exchanges]}).
|
||||
-rabbit_upgrade({mirrored_supervisor, mnesia, []}).
|
||||
-rabbit_upgrade({topic_trie_node, mnesia, []}).
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
|
|
@ -54,6 +55,7 @@
|
|||
-spec(gm/0 :: () -> 'ok').
|
||||
-spec(exchange_scratch/0 :: () -> 'ok').
|
||||
-spec(mirrored_supervisor/0 :: () -> 'ok').
|
||||
-spec(topic_trie_node/0 :: () -> 'ok').
|
||||
|
||||
-endif.
|
||||
|
||||
|
|
@ -177,6 +179,12 @@ mirrored_supervisor() ->
|
|||
[{record_name, mirrored_sup_childspec},
|
||||
{attributes, [key, mirroring_pid, childspec]}]).
|
||||
|
||||
topic_trie_node() ->
|
||||
create(rabbit_topic_trie_node,
|
||||
[{record_name, topic_trie_node},
|
||||
{attributes, [trie_node, edge_count, binding_count]},
|
||||
{type, ordered_set}]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
transform(TableName, Fun, FieldList) ->
|
||||
|
|
|
|||
|
|
@ -169,12 +169,10 @@ call(Pid, Msg) ->
|
|||
%%---------------------------------------------------------------------------
|
||||
|
||||
assemble_frame(Channel, MethodRecord, Protocol) ->
|
||||
?LOGMESSAGE(out, Channel, MethodRecord, none),
|
||||
rabbit_binary_generator:build_simple_method_frame(
|
||||
Channel, MethodRecord, Protocol).
|
||||
|
||||
assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
|
||||
?LOGMESSAGE(out, Channel, MethodRecord, Content),
|
||||
MethodName = rabbit_misc:method_record_type(MethodRecord),
|
||||
true = Protocol:method_has_content(MethodName), % assertion
|
||||
MethodFrame = rabbit_binary_generator:build_simple_method_frame(
|
||||
|
|
|
|||
|
|
@ -717,8 +717,8 @@ do_terminate(Child, SupName) when Child#child.pid =/= undefined ->
|
|||
ok;
|
||||
{error, normal} ->
|
||||
case Child#child.restart_type of
|
||||
permanent -> ReportError(normal);
|
||||
{permanent, _Delay} -> ReportError(normal);
|
||||
permanent -> ReportError(normal, Child);
|
||||
{permanent, _Delay} -> ReportError(normal, Child);
|
||||
_ -> ok
|
||||
end;
|
||||
{error, OtherReason} ->
|
||||
|
|
|
|||
|
|
@ -54,28 +54,9 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
|
|||
{ok, Mod} = inet_db:lookup_socket(LSock),
|
||||
inet_db:register_socket(Sock, Mod),
|
||||
|
||||
try
|
||||
%% report
|
||||
{Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
|
||||
{PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end),
|
||||
error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
|
||||
[rabbit_misc:ntoab(Address), Port,
|
||||
rabbit_misc:ntoab(PeerAddress), PeerPort]),
|
||||
%% In the event that somebody floods us with connections we can spew
|
||||
%% the above message at error_logger faster than it can keep up.
|
||||
%% So error_logger's mailbox grows unbounded until we eat all the
|
||||
%% memory available and crash. So here's a meaningless synchronous call
|
||||
%% to the underlying gen_event mechanism - when it returns the mailbox
|
||||
%% is drained.
|
||||
gen_event:which_handlers(error_logger),
|
||||
%% handle
|
||||
file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
|
||||
ok = file_handle_cache:obtain()
|
||||
catch {inet_error, Reason} ->
|
||||
gen_tcp:close(Sock),
|
||||
error_logger:error_msg("unable to accept TCP connection: ~p~n",
|
||||
[Reason])
|
||||
end,
|
||||
%% handle
|
||||
file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
|
||||
ok = file_handle_cache:obtain(),
|
||||
|
||||
%% accept more
|
||||
accept(State);
|
||||
|
|
@ -88,9 +69,12 @@ handle_info({inet_async, LSock, Ref, {error, closed}},
|
|||
|
||||
handle_info({inet_async, LSock, Ref, {error, Reason}},
|
||||
State=#state{sock=LSock, ref=Ref}) ->
|
||||
{Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
|
||||
{AddressS, Port} = case inet:sockname(LSock) of
|
||||
{ok, {A, P}} -> {rabbit_misc:ntoab(A), P};
|
||||
{error, _} -> {"unknown", unknown}
|
||||
end,
|
||||
error_logger:error_msg("failed to accept TCP connection on ~s:~p: ~p~n",
|
||||
[rabbit_misc:ntoab(Address), Port, Reason]),
|
||||
[AddressS, Port, Reason]),
|
||||
accept(State);
|
||||
|
||||
handle_info(_Info, State) ->
|
||||
|
|
@ -104,8 +88,6 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
|
||||
|
||||
accept(State = #state{sock=LSock}) ->
|
||||
case prim_inet:async_accept(LSock, -1) of
|
||||
{ok, Ref} -> {noreply, State#state{ref=Ref}};
|
||||
|
|
|
|||
|
|
@ -25,7 +25,11 @@
|
|||
%%----------------------------------------------------------------------------
|
||||
|
||||
-ifdef(use_specs).
|
||||
-spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()).
|
||||
|
||||
-type(mfargs() :: {atom(), atom(), [any()]}).
|
||||
|
||||
-spec(start_link/2 :: (atom(), mfargs()) -> rabbit_types:ok_pid_or_error()).
|
||||
|
||||
-endif.
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -28,9 +28,14 @@
|
|||
%%----------------------------------------------------------------------------
|
||||
|
||||
-ifdef(use_specs).
|
||||
|
||||
-type(mfargs() :: {atom(), atom(), [any()]}).
|
||||
|
||||
-spec(start_link/8 ::
|
||||
(gen_tcp:ip_address(), integer(), rabbit_types:infos(), integer(),
|
||||
atom(), mfa(), mfa(), string()) -> rabbit_types:ok_pid_or_error()).
|
||||
(inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
|
||||
integer(), atom(), mfargs(), mfargs(), string()) ->
|
||||
rabbit_types:ok_pid_or_error()).
|
||||
|
||||
-endif.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
@ -67,8 +72,9 @@ init({IPAddress, Port, SocketOpts,
|
|||
label = Label}};
|
||||
{error, Reason} ->
|
||||
error_logger:error_msg(
|
||||
"failed to start ~s on ~s:~p - ~p~n",
|
||||
[Label, rabbit_misc:ntoab(IPAddress), Port, Reason]),
|
||||
"failed to start ~s on ~s:~p - ~p (~s)~n",
|
||||
[Label, rabbit_misc:ntoab(IPAddress), Port,
|
||||
Reason, inet:format_error(Reason)]),
|
||||
{stop, {cannot_listen, IPAddress, Port, Reason}}
|
||||
end.
|
||||
|
||||
|
|
|
|||
|
|
@ -26,12 +26,16 @@
|
|||
|
||||
-ifdef(use_specs).
|
||||
|
||||
-type(mfargs() :: {atom(), atom(), [any()]}).
|
||||
|
||||
-spec(start_link/7 ::
|
||||
(gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(),
|
||||
mfa(), string()) -> rabbit_types:ok_pid_or_error()).
|
||||
(inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
|
||||
mfargs(), mfargs(), mfargs(), string()) ->
|
||||
rabbit_types:ok_pid_or_error()).
|
||||
-spec(start_link/8 ::
|
||||
(gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(),
|
||||
mfa(), integer(), string()) -> rabbit_types:ok_pid_or_error()).
|
||||
(inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
|
||||
mfargs(), mfargs(), mfargs(), integer(), string()) ->
|
||||
rabbit_types:ok_pid_or_error()).
|
||||
|
||||
-endif.
|
||||
|
||||
|
|
|
|||
|
|
@ -37,10 +37,11 @@
|
|||
|
||||
-ifdef(use_specs).
|
||||
|
||||
-type(mfargs() :: {atom(), atom(), [any()]}).
|
||||
|
||||
-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
|
||||
-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A).
|
||||
-spec(submit_async/1 ::
|
||||
(fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
|
||||
-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A).
|
||||
-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok').
|
||||
-spec(idle/1 :: (any()) -> 'ok').
|
||||
|
||||
-endif.
|
||||
|
|
|
|||
|
|
@ -29,12 +29,12 @@
|
|||
|
||||
-ifdef(use_specs).
|
||||
|
||||
-type(mfargs() :: {atom(), atom(), [any()]}).
|
||||
|
||||
-spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}).
|
||||
-spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A).
|
||||
-spec(submit_async/2 ::
|
||||
(pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
|
||||
-spec(run/1 :: (fun (() -> A)) -> A;
|
||||
({atom(), atom(), [any()]}) -> any()).
|
||||
-spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A).
|
||||
-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
|
||||
-spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()).
|
||||
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
|
||||
|
||||
-endif.
|
||||
|
|
|
|||
Loading…
Reference in New Issue