Merging default into bug23657

This commit is contained in:
Matthew Sackman 2011-02-17 11:56:49 +00:00
commit 68195be9ee
23 changed files with 215 additions and 134 deletions

View File

@ -9,20 +9,20 @@
</refentryinfo> </refentryinfo>
<refmeta> <refmeta>
<refentrytitle>rabbitmq.conf</refentrytitle> <refentrytitle>rabbitmq-env.conf</refentrytitle>
<manvolnum>5</manvolnum> <manvolnum>5</manvolnum>
<refmiscinfo class="manual">RabbitMQ Server</refmiscinfo> <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo>
</refmeta> </refmeta>
<refnamediv> <refnamediv>
<refname>rabbitmq.conf</refname> <refname>rabbitmq-env.conf</refname>
<refpurpose>default settings for RabbitMQ AMQP server</refpurpose> <refpurpose>default settings for RabbitMQ AMQP server</refpurpose>
</refnamediv> </refnamediv>
<refsect1> <refsect1>
<title>Description</title> <title>Description</title>
<para> <para>
<filename>/etc/rabbitmq/rabbitmq.conf</filename> contains variable settings that override the <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> contains variable settings that override the
defaults built in to the RabbitMQ startup scripts. defaults built in to the RabbitMQ startup scripts.
</para> </para>
<para> <para>
@ -33,7 +33,7 @@ operator), including line comments starting with "#".
</para> </para>
<para> <para>
In order of preference, the startup scripts get their values from the In order of preference, the startup scripts get their values from the
environment, from <filename>/etc/rabbitmq/rabbitmq.conf</filename> and finally from the environment, from <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> and finally from the
built-in default values. For example, for the <envar>RABBITMQ_NODENAME</envar> built-in default values. For example, for the <envar>RABBITMQ_NODENAME</envar>
setting, setting,
</para> </para>
@ -48,26 +48,26 @@ empty string, then
<envar>NODENAME</envar> <envar>NODENAME</envar>
</para> </para>
<para> <para>
from <filename>/etc/rabbitmq/rabbitmq.conf</filename> is checked. If it is also absent from <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> is checked. If it is also absent
or set equal to the empty string then the default value from the or set equal to the empty string then the default value from the
startup script is used. startup script is used.
</para> </para>
<para> <para>
The variable names in /etc/rabbitmq/rabbitmq.conf are always equal to the The variable names in /etc/rabbitmq/rabbitmq-env.conf are always equal to the
environment variable names, with the <envar>RABBITMQ_</envar> prefix removed: environment variable names, with the <envar>RABBITMQ_</envar> prefix removed:
<envar>RABBITMQ_NODE_PORT</envar> from the environment becomes <envar>NODE_PORT</envar> in the <envar>RABBITMQ_NODE_PORT</envar> from the environment becomes <envar>NODE_PORT</envar> in the
<filename>/etc/rabbitmq/rabbitmq.conf</filename> file, etc. <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> file, etc.
</para> </para>
<para role="example-prefix">For example:</para> <para role="example-prefix">For example:</para>
<screen role="example-multiline"> <screen role="example-multiline">
# I am a complete /etc/rabbitmq/rabbitmq.conf file. # I am a complete /etc/rabbitmq/rabbitmq-env.conf file.
# Comment lines start with a hash character. # Comment lines start with a hash character.
# This is a /bin/sh script file - use ordinary envt var syntax # This is a /bin/sh script file - use ordinary envt var syntax
NODENAME=hare NODENAME=hare
</screen> </screen>
<para role="example"> <para role="example">
This is an example of a complete This is an example of a complete
<filename>/etc/rabbitmq/rabbitmq.conf</filename> file that overrides the default Erlang <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> file that overrides the default Erlang
node name from "rabbit" to "hare". node name from "rabbit" to "hare".
</para> </para>

View File

@ -124,7 +124,7 @@ Defaults to 5672.
<refsect1> <refsect1>
<title>See also</title> <title>See also</title>
<para> <para>
<citerefentry><refentrytitle>rabbitmq.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry> <citerefentry><refentrytitle>rabbitmq-env.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry>
<citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry> <citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>
</para> </para>
</refsect1> </refsect1>

View File

@ -34,4 +34,11 @@
{collect_statistics, none}, {collect_statistics, none},
{auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, {auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
{auth_backends, [rabbit_auth_backend_internal]}, {auth_backends, [rabbit_auth_backend_internal]},
{delegate_count, 16}]}]}. {delegate_count, 16},
{tcp_listen_options, [binary,
{packet, raw},
{reuseaddr, true},
{backlog, 128},
{nodelay, true},
{exit_on_close, false}]}
]}]}.

View File

@ -91,6 +91,9 @@ fi
%post %post
/sbin/chkconfig --add %{name} /sbin/chkconfig --add %{name}
if [ -f %{_sysconfdir}/rabbitmq/rabbitmq.conf ] && [ ! -f %{_sysconfdir}/rabbitmq/rabbitmq-env.conf ]; then
mv %{_sysconfdir}/rabbitmq/rabbitmq.conf %{_sysconfdir}/rabbitmq/rabbitmq-env.conf
fi
%preun %preun
if [ $1 = 0 ]; then if [ $1 = 0 ]; then

View File

@ -35,6 +35,10 @@ chown -R rabbitmq:rabbitmq /var/log/rabbitmq
case "$1" in case "$1" in
configure) configure)
if [ -f /etc/rabbitmq/rabbitmq.conf ] && \
[ ! -f /etc/rabbitmq/rabbitmq-env.conf ]; then
mv /etc/rabbitmq/rabbitmq.conf /etc/rabbitmq/rabbitmq-env.conf
fi
;; ;;
abort-upgrade|abort-remove|abort-deconfigure) abort-upgrade|abort-remove|abort-deconfigure)

View File

@ -81,7 +81,7 @@ post-destroot {
xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${serverhome} xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${serverhome}
xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir} xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir}
reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \ reinplace -E "s:(/etc/rabbitmq/rabbitmq):${prefix}\\1:g" \
${realsbin}/rabbitmq-env ${realsbin}/rabbitmq-env
foreach var {CONFIG_FILE LOG_BASE MNESIA_BASE} { foreach var {CONFIG_FILE LOG_BASE MNESIA_BASE} {
reinplace -E "s:^($var)=/:\\1=${prefix}/:" \ reinplace -E "s:^($var)=/:\\1=${prefix}/:" \
@ -102,7 +102,7 @@ post-destroot {
file copy ${mansrc}/man1/rabbitmq-server.1.gz ${mandest}/man1/ file copy ${mansrc}/man1/rabbitmq-server.1.gz ${mandest}/man1/
file copy ${mansrc}/man1/rabbitmqctl.1.gz ${mandest}/man1/ file copy ${mansrc}/man1/rabbitmqctl.1.gz ${mandest}/man1/
file copy ${mansrc}/man5/rabbitmq.conf.5.gz ${mandest}/man5/ file copy ${mansrc}/man5/rabbitmq-env.conf.5.gz ${mandest}/man5/
} }
pre-install { pre-install {

View File

@ -37,4 +37,8 @@ RABBITMQ_HOME="${SCRIPT_DIR}/.."
NODENAME=rabbit@${HOSTNAME%%.*} NODENAME=rabbit@${HOSTNAME%%.*}
# Load configuration from the rabbitmq.conf file # Load configuration from the rabbitmq.conf file
[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf if [ -f /etc/rabbitmq/rabbitmq.conf ]; then
echo -n "WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- "
echo "location has moved to /etc/rabbitmq/rabbitmq-env.conf"
fi
[ -f /etc/rabbitmq/rabbitmq-env.conf ] && . /etc/rabbitmq/rabbitmq-env.conf

View File

@ -16,7 +16,6 @@
## ##
SERVER_ERL_ARGS="+K true +A30 +P 1048576 \ SERVER_ERL_ARGS="+K true +A30 +P 1048576 \
-kernel inet_default_listen_options [{nodelay,true}] \
-kernel inet_default_connect_options [{nodelay,true}]" -kernel inet_default_connect_options [{nodelay,true}]"
CONFIG_FILE=/etc/rabbitmq/rabbitmq CONFIG_FILE=/etc/rabbitmq/rabbitmq
LOG_BASE=/var/log/rabbitmq LOG_BASE=/var/log/rabbitmq

View File

@ -142,7 +142,6 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
+W w ^ +W w ^
+A30 ^ +A30 ^
+P 1048576 ^ +P 1048576 ^
-kernel inet_default_listen_options "[{nodelay, true}]" ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^ -kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^ !RABBITMQ_LISTEN_ARG! ^
-kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^ -kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^

View File

@ -207,7 +207,6 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-s rabbit ^ -s rabbit ^
+W w ^ +W w ^
+A30 ^ +A30 ^
-kernel inet_default_listen_options "[{nodelay,true}]" ^
-kernel inet_default_connect_options "[{nodelay,true}]" ^ -kernel inet_default_connect_options "[{nodelay,true}]" ^
!RABBITMQ_LISTEN_ARG! ^ !RABBITMQ_LISTEN_ARG! ^
-kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^ -kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^

View File

@ -18,7 +18,7 @@
-behaviour(gen_server2). -behaviour(gen_server2).
-export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/1]). -export([start_link/1, invoke_no_result/2, invoke/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
@ -36,8 +36,6 @@
([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
[{pid(), term()}]}). [{pid(), term()}]}).
-spec(delegate_count/1 :: ([node()]) -> non_neg_integer()).
-endif. -endif.
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
@ -111,22 +109,14 @@ group_pids_by_node(Pids) ->
node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)} node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)}
end, {[], orddict:new()}, Pids). end, {[], orddict:new()}, Pids).
delegate_count([RemoteNode | _]) ->
{ok, Count} = case application:get_env(rabbit, delegate_count) of
undefined -> rpc:call(RemoteNode, application, get_env,
[rabbit, delegate_count]);
Result -> Result
end,
Count.
delegate_name(Hash) -> delegate_name(Hash) ->
list_to_atom("delegate_" ++ integer_to_list(Hash)). list_to_atom("delegate_" ++ integer_to_list(Hash)).
delegate(RemoteNodes) -> delegate(RemoteNodes) ->
case get(delegate) of case get(delegate) of
undefined -> Name = undefined -> Name = delegate_name(
delegate_name(erlang:phash2( erlang:phash2(self(),
self(), delegate_count(RemoteNodes))), delegate_sup:count(RemoteNodes))),
put(delegate, Name), put(delegate, Name),
Name; Name;
Name -> Name Name -> Name

View File

@ -18,7 +18,7 @@
-behaviour(supervisor). -behaviour(supervisor).
-export([start_link/0]). -export([start_link/1, count/1]).
-export([init/1]). -export([init/1]).
@ -28,20 +28,32 @@
-ifdef(use_specs). -ifdef(use_specs).
-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). -spec(start_link/1 :: (integer()) -> {'ok', pid()} | {'error', any()}).
-spec(count/1 :: ([node()]) -> integer()).
-endif. -endif.
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
start_link() -> start_link(Count) ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []). supervisor:start_link({local, ?SERVER}, ?MODULE, [Count]).
count([]) ->
1;
count([Node | Nodes]) ->
try
length(supervisor:which_children({?SERVER, Node}))
catch exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown ->
count(Nodes);
exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown;
R =:= nodedown ->
count(Nodes)
end.
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
init(_Args) -> init([Count]) ->
DCount = delegate:delegate_count([node()]),
{ok, {{one_for_one, 10, 10}, {ok, {{one_for_one, 10, 10},
[{Num, {delegate, start_link, [Num]}, [{Num, {delegate, start_link, [Num]},
transient, 16#ffffffff, worker, [delegate]} || transient, 16#ffffffff, worker, [delegate]} ||
Num <- lists:seq(0, DCount - 1)]}}. Num <- lists:seq(0, Count - 1)]}}.

View File

@ -58,6 +58,15 @@
%% hibernate the process immediately, as it would if backoff wasn't %% hibernate the process immediately, as it would if backoff wasn't
%% being used. Instead it'll wait for the current timeout as described %% being used. Instead it'll wait for the current timeout as described
%% above. %% above.
%%
%% 7) The callback module can return from any of the handle_*
%% functions, a {become, Module, State} triple, or a {become, Module,
%% State, Timeout} quadruple. This allows the gen_server to
%% dynamically change the callback module. The State is the new state
%% which will be passed into any of the callback functions in the new
%% module. Note there is no form also encompassing a reply, thus if
%% you wish to reply in handle_call/3 and change the callback module,
%% you need to use gen_server2:reply/2 to issue the reply manually.
%% All modifications are (C) 2009-2011 VMware, Inc. %% All modifications are (C) 2009-2011 VMware, Inc.
@ -880,6 +889,22 @@ handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name,
loop(GS2State #gs2_state { state = NState, loop(GS2State #gs2_state { state = NState,
time = Time1, time = Time1,
debug = Debug1 }); debug = Debug1 });
{become, Mod, NState} ->
Debug1 = common_debug(Debug, fun print_event/3, Name,
{become, Mod, NState}),
loop(find_prioritisers(
GS2State #gs2_state { mod = Mod,
state = NState,
time = infinity,
debug = Debug1 }));
{become, Mod, NState, Time1} ->
Debug1 = common_debug(Debug, fun print_event/3, Name,
{become, Mod, NState}),
loop(find_prioritisers(
GS2State #gs2_state { mod = Mod,
state = NState,
time = Time1,
debug = Debug1 }));
_ -> _ ->
handle_common_termination(Reply, Msg, GS2State) handle_common_termination(Reply, Msg, GS2State)
end. end.

View File

@ -27,7 +27,7 @@
%%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
%% Boot steps. %% Boot steps.
-export([maybe_insert_default_data/0]). -export([maybe_insert_default_data/0, boot_delegate/0]).
-rabbit_boot_step({codec_correctness_check, -rabbit_boot_step({codec_correctness_check,
[{description, "codec correctness check"}, [{description, "codec correctness check"},
@ -101,8 +101,7 @@
-rabbit_boot_step({delegate_sup, -rabbit_boot_step({delegate_sup,
[{description, "cluster delegate"}, [{description, "cluster delegate"},
{mfa, {rabbit_sup, start_child, {mfa, {rabbit, boot_delegate, []}},
[delegate_sup]}},
{requires, kernel_ready}, {requires, kernel_ready},
{enables, core_initialized}]}). {enables, core_initialized}]}).
@ -184,6 +183,9 @@
{running_nodes, [node()]}]). {running_nodes, [node()]}]).
-spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()). -spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()).
-spec(maybe_insert_default_data/0 :: () -> 'ok').
-spec(boot_delegate/0 :: () -> 'ok').
-endif. -endif.
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
@ -454,6 +456,10 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler,
end end
end. end.
boot_delegate() ->
{ok, Count} = application:get_env(rabbit, delegate_count),
rabbit_sup:start_child(delegate_sup, [Count]).
maybe_insert_default_data() -> maybe_insert_default_data() ->
case rabbit_mnesia:is_db_empty() of case rabbit_mnesia:is_db_empty() of
true -> insert_default_data(); true -> insert_default_data();

View File

@ -218,7 +218,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
rabbit_misc:const(not_found) rabbit_misc:const(not_found)
end; end;
[ExistingQ = #amqqueue{pid = QPid}] -> [ExistingQ = #amqqueue{pid = QPid}] ->
case is_process_alive(QPid) of case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ); true -> rabbit_misc:const(ExistingQ);
false -> TailFun = internal_delete(QueueName), false -> TailFun = internal_delete(QueueName),
fun (Tx) -> TailFun(Tx), ExistingQ end fun (Tx) -> TailFun(Tx), ExistingQ end
@ -300,29 +300,19 @@ check_declare_arguments(QueueName, Args) ->
"invalid arg '~s' for ~s: ~w", "invalid arg '~s' for ~s: ~w",
[Key, rabbit_misc:rs(QueueName), Error]) [Key, rabbit_misc:rs(QueueName), Error])
end || {Key, Fun} <- end || {Key, Fun} <-
[{<<"x-expires">>, fun check_expires_argument/1}, [{<<"x-expires">>, fun check_integer_argument/1},
{<<"x-message-ttl">>, fun check_message_ttl_argument/1}]], {<<"x-message-ttl">>, fun check_integer_argument/1}]],
ok. ok.
check_expires_argument(Val) -> check_integer_argument(undefined) ->
check_integer_argument(Val,
expires_not_of_acceptable_type,
expires_zero_or_less).
check_message_ttl_argument(Val) ->
check_integer_argument(Val,
ttl_not_of_acceptable_type,
ttl_zero_or_less).
check_integer_argument(undefined, _, _) ->
ok; ok;
check_integer_argument({Type, Val}, InvalidTypeError, _) when Val > 0 -> check_integer_argument({Type, Val}) when Val > 0 ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of case lists:member(Type, ?INTEGER_ARG_TYPES) of
true -> ok; true -> ok;
false -> {error, {InvalidTypeError, Type, Val}} false -> {error, {unacceptable_type, Type}}
end; end;
check_integer_argument({_Type, _Val}, _, ZeroOrLessError) -> check_integer_argument({_Type, Val}) ->
{error, ZeroOrLessError}. {error, {value_zero_or_less, Val}}.
list(VHostPath) -> list(VHostPath) ->
mnesia:dirty_match_object( mnesia:dirty_match_object(
@ -422,7 +412,7 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
infinity). infinity).
notify_sent(QPid, ChPid) -> notify_sent(QPid, ChPid) ->
delegate_cast(QPid, {notify_sent, ChPid}). gen_server2:cast(QPid, {notify_sent, ChPid}).
unblock(QPid, ChPid) -> unblock(QPid, ChPid) ->
delegate_cast(QPid, {unblock, ChPid}). delegate_cast(QPid, {unblock, ChPid}).

View File

@ -21,7 +21,7 @@
-behaviour(gen_server2). -behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100). -define(UNSENT_MESSAGE_LIMIT, 100).
-define(SYNC_INTERVAL, 5). %% milliseconds -define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(BASE_MESSAGE_PROPERTIES, -define(BASE_MESSAGE_PROPERTIES,
@ -122,6 +122,8 @@ terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
terminate(_Reason, State = #q{backing_queue = BQ}) -> terminate(_Reason, State = #q{backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions? %% FIXME: How do we cancel active subscriptions?
terminate_shutdown(fun (BQS) -> terminate_shutdown(fun (BQS) ->
rabbit_event:notify(
queue_deleted, [{pid, self()}]),
BQS1 = BQ:delete_and_terminate(BQS), BQS1 = BQ:delete_and_terminate(BQS),
%% don't care if the internal delete %% don't care if the internal delete
%% doesn't return 'ok'. %% doesn't return 'ok'.
@ -186,7 +188,6 @@ terminate_shutdown(Fun, State) ->
end, BQS, all_ch_record()), end, BQS, all_ch_record()),
[emit_consumer_deleted(Ch, CTag) [emit_consumer_deleted(Ch, CTag)
|| {Ch, CTag, _} <- consumers(State1)], || {Ch, CTag, _} <- consumers(State1)],
rabbit_event:notify(queue_deleted, [{pid, self()}]),
State1#q{backing_queue_state = Fun(BQS1)} State1#q{backing_queue_state = Fun(BQS1)}
end. end.
@ -657,13 +658,13 @@ message_properties(#q{ttl=TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL)}. #message_properties{expiry = calculate_msg_expiry(TTL)}.
calculate_msg_expiry(undefined) -> undefined; calculate_msg_expiry(undefined) -> undefined;
calculate_msg_expiry(TTL) -> now_millis() + (TTL * 1000). calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
drop_expired_messages(State = #q{ttl = undefined}) -> drop_expired_messages(State = #q{ttl = undefined}) ->
State; State;
drop_expired_messages(State = #q{backing_queue_state = BQS, drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ}) -> backing_queue = BQ}) ->
Now = now_millis(), Now = now_micros(),
BQS1 = BQ:dropwhile( BQS1 = BQ:dropwhile(
fun (#message_properties{expiry = Expiry}) -> fun (#message_properties{expiry = Expiry}) ->
Now > Expiry Now > Expiry
@ -684,7 +685,7 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) -> ensure_ttl_timer(State) ->
State. State.
now_millis() -> timer:now_diff(now(), {0,0,0}). now_micros() -> timer:now_diff(now(), {0,0,0}).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@ -790,20 +791,20 @@ handle_call({init, Recover}, From,
handle_call({init, Recover}, From, handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of case rabbit_misc:is_process_alive(Owner) of
true -> erlang:monitor(process, Owner), true -> erlang:monitor(process, Owner),
declare(Recover, From, State); declare(Recover, From, State);
_ -> #q{q = #amqqueue{name = QName, durable = IsDurable}, false -> #q{backing_queue = BQ, backing_queue_state = undefined,
backing_queue = BQ, backing_queue_state = undefined} = State, q = #amqqueue{name = QName, durable = IsDurable}} = State,
gen_server2:reply(From, not_found), gen_server2:reply(From, not_found),
case Recover of case Recover of
true -> ok; true -> ok;
_ -> rabbit_log:warning( _ -> rabbit_log:warning(
"Queue ~p exclusive owner went away~n", [QName]) "Queue ~p exclusive owner went away~n", [QName])
end, end,
BQS = BQ:init(QName, IsDurable, Recover), BQS = BQ:init(QName, IsDurable, Recover),
%% Rely on terminate to delete the queue. %% Rely on terminate to delete the queue.
{stop, normal, State#q{backing_queue_state = BQS}} {stop, normal, State#q{backing_queue_state = BQS}}
end; end;
handle_call(info, _From, State) -> handle_call(info, _From, State) ->

View File

@ -45,22 +45,18 @@
start() -> start() ->
{ok, [[NodeStr|_]|_]} = init:get_argument(nodename), {ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
FullCommand = init:get_plain_arguments(),
case FullCommand of
[] -> usage();
_ -> ok
end,
{[Command0 | Args], Opts} = {[Command0 | Args], Opts} =
rabbit_misc:get_options( case rabbit_misc:get_options([{flag, ?QUIET_OPT},
[{flag, ?QUIET_OPT}, {option, ?NODE_OPT, NodeStr}, {option, ?NODE_OPT, NodeStr},
{option, ?VHOST_OPT, "/"}], {option, ?VHOST_OPT, "/"}],
FullCommand), init:get_plain_arguments()) of
Opts1 = lists:map(fun({K, V}) -> {[], _Opts} -> usage();
case K of CmdArgsAndOpts -> CmdArgsAndOpts
?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)}; end,
_ -> {K, V} Opts1 = [case K of
end ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)};
end, Opts), _ -> {K, V}
end || {K, V} <- Opts],
Command = list_to_atom(Command0), Command = list_to_atom(Command0),
Quiet = proplists:get_bool(?QUIET_OPT, Opts1), Quiet = proplists:get_bool(?QUIET_OPT, Opts1),
Node = proplists:get_value(?NODE_OPT, Opts1), Node = proplists:get_value(?NODE_OPT, Opts1),

View File

@ -16,7 +16,7 @@
-module(rabbit_direct). -module(rabbit_direct).
-export([boot/0, connect/3, start_channel/5]). -export([boot/0, connect/4, start_channel/5]).
-include("rabbit.hrl"). -include("rabbit.hrl").
@ -25,7 +25,7 @@
-ifdef(use_specs). -ifdef(use_specs).
-spec(boot/0 :: () -> 'ok'). -spec(boot/0 :: () -> 'ok').
-spec(connect/3 :: (binary(), binary(), binary()) -> -spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) ->
{'ok', {rabbit_types:user(), {'ok', {rabbit_types:user(),
rabbit_framing:amqp_table()}}). rabbit_framing:amqp_table()}}).
-spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(), -spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(),
@ -49,13 +49,14 @@ boot() ->
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
connect(Username, Password, VHost) -> connect(Username, Password, VHost, Protocol) ->
case lists:keymember(rabbit, 1, application:which_applications()) of case lists:keymember(rabbit, 1, application:which_applications()) of
true -> true ->
try rabbit_access_control:user_pass_login(Username, Password) of try rabbit_access_control:user_pass_login(Username, Password) of
#user{} = User -> #user{} = User ->
try rabbit_access_control:check_vhost_access(User, VHost) of try rabbit_access_control:check_vhost_access(User, VHost) of
ok -> {ok, {User, rabbit_reader:server_properties()}} ok -> {ok, {User,
rabbit_reader:server_properties(Protocol)}}
catch catch
exit:#amqp_error{name = access_refused} -> exit:#amqp_error{name = access_refused} ->
{error, access_refused} {error, access_refused}

View File

@ -56,6 +56,7 @@
-export([lock_file/1]). -export([lock_file/1]).
-export([const_ok/1, const/1]). -export([const_ok/1, const/1]).
-export([ntoa/1, ntoab/1]). -export([ntoa/1, ntoab/1]).
-export([is_process_alive/1]).
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
@ -194,6 +195,7 @@
-spec(const/1 :: (A) -> const(A)). -spec(const/1 :: (A) -> const(A)).
-spec(ntoa/1 :: (inet:ip_address()) -> string()). -spec(ntoa/1 :: (inet:ip_address()) -> string()).
-spec(ntoab/1 :: (inet:ip_address()) -> string()). -spec(ntoab/1 :: (inet:ip_address()) -> string()).
-spec(is_process_alive/1 :: (pid()) -> boolean()).
-endif. -endif.
@ -861,3 +863,12 @@ ntoab(IP) ->
0 -> Str; 0 -> Str;
_ -> "[" ++ Str ++ "]" _ -> "[" ++ Str ++ "]"
end. end.
is_process_alive(Pid) when node(Pid) =:= node() ->
erlang:is_process_alive(Pid);
is_process_alive(Pid) ->
case rpc:call(node(Pid), erlang, is_process_alive, [Pid]) of
true -> true;
_ -> false
end.

View File

@ -33,7 +33,7 @@
-include("rabbit_msg_store.hrl"). -include("rabbit_msg_store.hrl").
-define(SYNC_INTERVAL, 5). %% milliseconds -define(SYNC_INTERVAL, 25). %% milliseconds
-define(CLEAN_FILENAME, "clean.dot"). -define(CLEAN_FILENAME, "clean.dot").
-define(FILE_SUMMARY_FILENAME, "file_summary.ets"). -define(FILE_SUMMARY_FILENAME, "file_summary.ets").

View File

@ -32,16 +32,6 @@
-include("rabbit.hrl"). -include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl"). -include_lib("kernel/include/inet.hrl").
-define(RABBIT_TCP_OPTS, [
binary,
{packet, raw}, % no packaging
{reuseaddr, true}, % allow rebind without waiting
{backlog, 128}, % use the maximum listen(2) backlog value
%% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg.
%% {delay_send, true},
{exit_on_close, false}
]).
-define(SSL_TIMEOUT, 5). %% seconds -define(SSL_TIMEOUT, 5). %% seconds
-define(FIRST_TEST_BIND_PORT, 10000). -define(FIRST_TEST_BIND_PORT, 10000).
@ -200,7 +190,7 @@ start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) ->
rabbit_sup, rabbit_sup,
{Name, {Name,
{tcp_listener_sup, start_link, {tcp_listener_sup, start_link,
[IPAddress, Port, [Family | ?RABBIT_TCP_OPTS], [IPAddress, Port, [Family | tcp_opts()],
{?MODULE, tcp_listener_started, [Protocol]}, {?MODULE, tcp_listener_started, [Protocol]},
{?MODULE, tcp_listener_stopped, [Protocol]}, {?MODULE, tcp_listener_stopped, [Protocol]},
OnConnect, Label]}, OnConnect, Label]},
@ -315,6 +305,10 @@ hostname() ->
cmap(F) -> rabbit_misc:filter_exit_map(F, connections()). cmap(F) -> rabbit_misc:filter_exit_map(F, connections()).
tcp_opts() ->
{ok, Opts} = application:get_env(rabbit, tcp_listen_options),
Opts.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% There are three kinds of machine (for our purposes). %% There are three kinds of machine (for our purposes).

View File

@ -24,7 +24,7 @@
-export([init/4, mainloop/2]). -export([init/4, mainloop/2]).
-export([conserve_memory/2, server_properties/0]). -export([conserve_memory/2, server_properties/1]).
-export([process_channel_frame/5]). %% used by erlang-client -export([process_channel_frame/5]). %% used by erlang-client
@ -160,7 +160,8 @@
-spec(emit_stats/1 :: (pid()) -> 'ok'). -spec(emit_stats/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). -spec(server_properties/1 :: (rabbit_types:protocol()) ->
rabbit_framing:amqp_table()).
%% These specs only exists to add no_return() to keep dialyzer happy %% These specs only exists to add no_return() to keep dialyzer happy
-spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) -spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun())
@ -219,7 +220,7 @@ conserve_memory(Pid, Conserve) ->
Pid ! {conserve_memory, Conserve}, Pid ! {conserve_memory, Conserve},
ok. ok.
server_properties() -> server_properties(Protocol) ->
{ok, Product} = application:get_key(rabbit, id), {ok, Product} = application:get_key(rabbit, id),
{ok, Version} = application:get_key(rabbit, vsn), {ok, Version} = application:get_key(rabbit, vsn),
@ -230,22 +231,30 @@ server_properties() ->
%% Normalize the simplifed (2-tuple) and unsimplified (3-tuple) forms %% Normalize the simplifed (2-tuple) and unsimplified (3-tuple) forms
%% from the config and merge them with the generated built-in properties %% from the config and merge them with the generated built-in properties
NormalizedConfigServerProps = NormalizedConfigServerProps =
[case X of [{<<"capabilities">>, table, server_capabilities(Protocol)} |
{KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)), [case X of
longstr, {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)),
list_to_binary(Value)}; longstr,
{BinKey, Type, Value} -> {BinKey, Type, Value} list_to_binary(Value)};
end || X <- RawConfigServerProps ++ {BinKey, Type, Value} -> {BinKey, Type, Value}
[{product, Product}, end || X <- RawConfigServerProps ++
{version, Version}, [{product, Product},
{platform, "Erlang/OTP"}, {version, Version},
{copyright, ?COPYRIGHT_MESSAGE}, {platform, "Erlang/OTP"},
{information, ?INFORMATION_MESSAGE}]], {copyright, ?COPYRIGHT_MESSAGE},
{information, ?INFORMATION_MESSAGE}]]],
%% Filter duplicated properties in favor of config file provided values %% Filter duplicated properties in favor of config file provided values
lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end, lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end,
NormalizedConfigServerProps). NormalizedConfigServerProps).
server_capabilities(rabbit_framing_amqp_0_9_1) ->
[{<<"publisher_confirms">>, bool, true},
{<<"exchange_exchange_bindings">>, bool, true},
{<<"basic.nack">>, bool, true}];
server_capabilities(_) ->
[].
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
socket_op(Sock, Fun) -> socket_op(Sock, Fun) ->
@ -655,7 +664,7 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
Start = #'connection.start'{ Start = #'connection.start'{
version_major = ProtocolMajor, version_major = ProtocolMajor,
version_minor = ProtocolMinor, version_minor = ProtocolMinor,
server_properties = server_properties(), server_properties = server_properties(Protocol),
mechanisms = auth_mechanisms_binary(), mechanisms = auth_mechanisms_binary(),
locales = <<"en_US">> }, locales = <<"en_US">> },
ok = send_on_channel0(Sock, Start, Protocol), ok = send_on_channel0(Sock, Start, Protocol),

View File

@ -82,6 +82,7 @@ run_cluster_dependent_tests(SecondaryNode) ->
passed = test_delegates_async(SecondaryNode), passed = test_delegates_async(SecondaryNode),
passed = test_delegates_sync(SecondaryNode), passed = test_delegates_sync(SecondaryNode),
passed = test_queue_cleanup(SecondaryNode), passed = test_queue_cleanup(SecondaryNode),
passed = test_declare_on_dead_queue(SecondaryNode),
%% we now run the tests remotely, so that code coverage on the %% we now run the tests remotely, so that code coverage on the
%% local node picks up more of the delegate %% local node picks up more of the delegate
@ -90,13 +91,14 @@ run_cluster_dependent_tests(SecondaryNode) ->
Remote = spawn(SecondaryNode, Remote = spawn(SecondaryNode,
fun () -> Rs = [ test_delegates_async(Node), fun () -> Rs = [ test_delegates_async(Node),
test_delegates_sync(Node), test_delegates_sync(Node),
test_queue_cleanup(Node) ], test_queue_cleanup(Node),
test_declare_on_dead_queue(Node) ],
Self ! {self(), Rs} Self ! {self(), Rs}
end), end),
receive receive
{Remote, Result} -> {Remote, Result} ->
Result = [passed, passed, passed] Result = lists:duplicate(length(Result), passed)
after 2000 -> after 30000 ->
throw(timeout) throw(timeout)
end, end,
@ -1310,6 +1312,32 @@ test_queue_cleanup(_SecondaryNode) ->
end, end,
passed. passed.
test_declare_on_dead_queue(SecondaryNode) ->
QueueName = rabbit_misc:r(<<"/">>, queue, ?CLEANUP_QUEUE_NAME),
Self = self(),
Pid = spawn(SecondaryNode,
fun () ->
{new, #amqqueue{name = QueueName, pid = QPid}} =
rabbit_amqqueue:declare(QueueName, false, false, [],
none),
exit(QPid, kill),
Self ! {self(), killed, QPid}
end),
receive
{Pid, killed, QPid} ->
{existing, #amqqueue{name = QueueName,
pid = QPid}} =
rabbit_amqqueue:declare(QueueName, false, false, [], none),
false = rabbit_misc:is_process_alive(QPid),
{new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [],
none),
true = rabbit_misc:is_process_alive(Q#amqqueue.pid),
{ok, 0} = rabbit_amqqueue:delete(Q, false, false),
passed
after 2000 ->
throw(failed_to_create_and_kill_queue)
end.
%--------------------------------------------------------------------- %---------------------------------------------------------------------
control_action(Command, Args) -> control_action(Command, Args) ->
@ -2173,9 +2201,11 @@ test_configurable_server_properties() ->
BuiltInPropNames = [<<"product">>, <<"version">>, <<"platform">>, BuiltInPropNames = [<<"product">>, <<"version">>, <<"platform">>,
<<"copyright">>, <<"information">>], <<"copyright">>, <<"information">>],
Protocol = rabbit_framing_amqp_0_9_1,
%% Verify that the built-in properties are initially present %% Verify that the built-in properties are initially present
ActualPropNames = [Key || ActualPropNames = [Key || {Key, longstr, _} <-
{Key, longstr, _} <- rabbit_reader:server_properties()], rabbit_reader:server_properties(Protocol)],
true = lists:all(fun (X) -> lists:member(X, ActualPropNames) end, true = lists:all(fun (X) -> lists:member(X, ActualPropNames) end,
BuiltInPropNames), BuiltInPropNames),
@ -2186,9 +2216,10 @@ test_configurable_server_properties() ->
ConsProp = fun (X) -> application:set_env(rabbit, ConsProp = fun (X) -> application:set_env(rabbit,
server_properties, server_properties,
[X | ServerProperties]) end, [X | ServerProperties]) end,
IsPropPresent = fun (X) -> lists:member(X, IsPropPresent =
rabbit_reader:server_properties()) fun (X) ->
end, lists:member(X, rabbit_reader:server_properties(Protocol))
end,
%% Add a wholly new property of the simplified {KeyAtom, StringValue} form %% Add a wholly new property of the simplified {KeyAtom, StringValue} form
NewSimplifiedProperty = {NewHareKey, NewHareVal} = {hare, "soup"}, NewSimplifiedProperty = {NewHareKey, NewHareVal} = {hare, "soup"},
@ -2211,7 +2242,7 @@ test_configurable_server_properties() ->
{BinNewVerKey, BinNewVerVal} = {list_to_binary(atom_to_list(NewVerKey)), {BinNewVerKey, BinNewVerVal} = {list_to_binary(atom_to_list(NewVerKey)),
list_to_binary(NewVerVal)}, list_to_binary(NewVerVal)},
ConsProp(NewVersion), ConsProp(NewVersion),
ClobberedServerProps = rabbit_reader:server_properties(), ClobberedServerProps = rabbit_reader:server_properties(Protocol),
%% Is the clobbering insert present? %% Is the clobbering insert present?
true = IsPropPresent({BinNewVerKey, longstr, BinNewVerVal}), true = IsPropPresent({BinNewVerKey, longstr, BinNewVerVal}),
%% Is the clobbering insert the only thing with the clobbering key? %% Is the clobbering insert the only thing with the clobbering key?