Merged default into bug19250

This commit is contained in:
Ben Hood 2008-10-16 13:03:49 +01:00
commit dbf935be37
6 changed files with 85 additions and 71 deletions

View File

@ -5,7 +5,7 @@ SOURCE_TARBALL_DIR=../../../dist
TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz
TOP_DIR=$(shell pwd)
RPM_VERSION=$(shell echo $(VERSION) | tr - _)
DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'main_version $(VERSION)' --define 'rpm_version $(RPM_VERSION)'
DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'main_version $(VERSION)' --define 'rpm_version $(RPM_VERSION)' --define 'debian 1'
rpms: clean server

View File

@ -6,6 +6,10 @@ Group: Development/Libraries
Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{main_version}/%{name}-%{main_version}.tar.gz
URL: http://www.rabbitmq.com/
Vendor: LShift Ltd., Cohesive Financial Technologies LLC., Rabbit Technlogies Ltd.
%if 0%{?debian}
%else
BuildRequires: python, python-json
%endif
Requires: erlang, logrotate
Packager: Hubert Plociniczak <hubert@lshift.net>
BuildRoot: %{_tmppath}/%{name}-%{main_version}-%{release}-root
@ -18,13 +22,11 @@ RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
scalable implementation of an AMQP broker.
%define _mandir /usr/share/man
%define _sbindir /usr/sbin
%define _libdir %(erl -noshell -eval "io:format('~s~n', [code:lib_dir()]), halt().")
%define _maindir %{buildroot}%{_libdir}/rabbitmq_server-%{main_version}
%pre
if [ $1 -gt 1 ]; then
#Upgrade - stop and remove previous instance of rabbitmq-server init.d script

View File

@ -2,7 +2,7 @@ Source: rabbitmq-server
Section: net
Priority: extra
Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com>
Build-Depends: cdbs, debhelper (>= 5), erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, erlang-src, make, python
Build-Depends: cdbs, debhelper (>= 5), erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, erlang-src, make, python, python-json
Standards-Version: 3.7.2
Package: rabbitmq-server

View File

@ -28,12 +28,12 @@
-export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1,
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4,
commit/2, rollback/2]).
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
-export([add_binding/4, delete_binding/4, binding_forcibly_removed/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, notify_down/2]).
-export([notify_sent/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2]).
-export([on_node_down/1]).
-import(mnesia).
@ -44,6 +44,8 @@
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
-define(CALL_TIMEOUT, 5000).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@ -53,6 +55,9 @@
-type(qfun(A) :: fun ((amqqueue()) -> A)).
-type(bind_res() :: {'ok', non_neg_integer()} |
{'error', 'queue_not_found' | 'exchange_not_found'}).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-spec(start/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
-spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) ->
@ -81,9 +86,9 @@
-spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok').
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
-spec(commit/2 :: (pid(), txn()) -> 'ok').
-spec(rollback/2 :: (pid(), txn()) -> 'ok').
-spec(notify_down/2 :: (amqqueue(), pid()) -> 'ok').
-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok').
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
@ -287,14 +292,29 @@ requeue(QPid, MsgIds, ChPid) ->
ack(QPid, Txn, MsgIds, ChPid) ->
gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}).
commit(QPid, Txn) ->
gen_server:call(QPid, {commit, Txn}).
commit_all(QPids, Txn) ->
Timeout = length(QPids) * ?CALL_TIMEOUT,
safe_pmap_ok(
fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end,
QPids).
rollback(QPid, Txn) ->
gen_server:cast(QPid, {rollback, Txn}).
rollback_all(QPids, Txn) ->
safe_pmap_ok(
fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end,
QPids).
notify_down(#amqqueue{ pid = QPid }, ChPid) ->
gen_server:call(QPid, {notify_down, ChPid}).
notify_down_all(QPids, ChPid) ->
Timeout = length(QPids) * ?CALL_TIMEOUT,
safe_pmap_ok(
fun (QPid) ->
rabbit_misc:with_exit_handler(
%% we don't care if the queue process has terminated
%% in the meantime
fun () -> ok end,
fun () -> gen_server:call(QPid, {notify_down, ChPid},
Timeout) end)
end,
QPids).
binding_forcibly_removed(BindingSpec, QueueName) ->
rabbit_misc:execute_mnesia_transaction(
@ -367,3 +387,16 @@ pseudo_queue(QueueName, Pid) ->
arguments = [],
binding_specs = [],
pid = Pid}.
safe_pmap_ok(F, L) ->
case [R || R <- rabbit_misc:upmap(
fun (V) ->
try F(V)
catch Class:Reason -> {Class, Reason}
end
end, L),
R =/= ok] of
[] -> ok;
Errors -> {error, Errors}
end.

View File

@ -731,21 +731,6 @@ ack(ProxyPid, TxnKey, UAQ) ->
make_tx_id() -> rabbit_misc:guid().
safe_pmap_set_ok(F, S) ->
case lists:filter(fun (R) -> R =/= ok end,
rabbit_misc:upmap(
fun (V) ->
try F(V)
catch Class:Reason -> {Class, Reason}
end
end, sets:to_list(S))) of
[] -> ok;
Errors -> {error, Errors}
end.
notify_participants(F, TxnKey, Participants) ->
safe_pmap_set_ok(fun (QPid) -> F(QPid, TxnKey) end, Participants).
new_tx(State) ->
State#ch{transaction_id = make_tx_id(),
tx_participants = sets:new(),
@ -753,8 +738,8 @@ new_tx(State) ->
internal_commit(State = #ch{transaction_id = TxnKey,
tx_participants = Participants}) ->
case notify_participants(fun rabbit_amqqueue:commit/2,
TxnKey, Participants) of
case rabbit_amqqueue:commit_all(sets:to_list(Participants),
TxnKey) of
ok -> new_tx(State);
{error, Errors} -> exit({commit_failed, Errors})
end.
@ -767,8 +752,8 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
[self(),
queue:len(UAQ),
queue:len(UAMQ)]),
case notify_participants(fun rabbit_amqqueue:rollback/2,
TxnKey, Participants) of
case rabbit_amqqueue:rollback_all(sets:to_list(Participants),
TxnKey) of
ok -> NewUAMQ = queue:join(UAQ, UAMQ),
new_tx(State#ch{unacked_message_q = NewUAMQ});
{error, Errors} -> exit({rollback_failed, Errors})
@ -791,23 +776,18 @@ fold_per_queue(F, Acc0, UAQ) ->
Acc0, D).
notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) ->
safe_pmap_set_ok(
fun (QueueName) ->
case rabbit_amqqueue:with(
QueueName,
fun (Q) ->
rabbit_amqqueue:notify_down(Q, ProxyPid)
end) of
ok ->
ok;
{error, not_found} ->
%% queue has been deleted in the meantime
ok
end
end,
dict:fold(fun (_ConsumerTag, QueueName, S) ->
sets:add_element(QueueName, S)
end, sets:new(), Consumers)).
rabbit_amqqueue:notify_down_all(
[QPid || QueueName <-
sets:to_list(
dict:fold(fun (_ConsumerTag, QueueName, S) ->
sets:add_element(QueueName, S)
end, sets:new(), Consumers)),
case rabbit_amqqueue:lookup(QueueName) of
{ok, Q} -> QPid = Q#amqqueue.pid, true;
%% queue has been deleted in the meantime
{error, not_found} -> QPid = none, false
end],
ProxyPid).
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->

View File

@ -94,10 +94,18 @@
%% terminate_channel timeout -> remove 'closing' mark, *closing*
%% handshake_timeout -> ignore, *closing*
%% heartbeat timeout -> *throw*
%% channel exit ->
%% if abnormal exit then log error
%% if last channel to exit then send connection.close_ok, start
%% terminate_connection timer, *closing*
%% channel exit with hard error
%% -> log error, wait for channels to terminate forcefully, start
%% terminate_connection timer, send close, *closed*
%% channel exit with soft error
%% -> log error, start terminate_channel timer, mark channel as
%% closing
%% if last channel to exit then send connection.close_ok,
%% start terminate_connection timer, *closed*
%% else *closing*
%% channel exits normally
%% -> if last channel to exit then send connection.close_ok,
%% start terminate_connection timer, *closed*
%% closed:
%% socket close -> *terminate*
%% receive connection.close_ok -> self() ! terminate_connection,
@ -291,24 +299,13 @@ terminate_channel(Channel, Ref, State) ->
end,
State.
handle_dependent_exit(Pid, Reason,
State = #v1{connection_state = closing}) ->
case channel_cleanup(Pid) of
undefined -> exit({abnormal_dependent_exit, Pid, Reason});
Channel ->
case Reason of
normal -> ok;
_ -> log_channel_error(closing, Channel, Reason)
end,
maybe_close(State)
end;
handle_dependent_exit(Pid, normal, State) ->
channel_cleanup(Pid),
State;
maybe_close(State);
handle_dependent_exit(Pid, Reason, State) ->
case channel_cleanup(Pid) of
undefined -> exit({abnormal_dependent_exit, Pid, Reason});
Channel -> handle_exception(State, Channel, Reason)
Channel -> maybe_close(handle_exception(State, Channel, Reason))
end.
channel_cleanup(Pid) ->
@ -365,13 +362,15 @@ wait_for_channel_termination(N, TimerRef) ->
exit(channel_termination_timeout)
end.
maybe_close(State) ->
maybe_close(State = #v1{connection_state = closing}) ->
case all_channels() of
[] -> ok = send_on_channel0(
State#v1.sock, #'connection.close_ok'{}),
close_connection(State);
_ -> State
end.
end;
maybe_close(State) ->
State.
handle_frame(Type, 0, Payload, State = #v1{connection_state = CS})
when CS =:= closing; CS =:= closed ->