Merged default into 18776
This commit is contained in:
commit
6a6665ccd5
|
|
@ -4,32 +4,23 @@ VERSION=0.0.0
|
|||
SOURCE_TARBALL_DIR=../../../dist
|
||||
TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz
|
||||
TOP_DIR=$(shell pwd)
|
||||
RPM_VERSION=$(shell echo $(VERSION) | tr - _)
|
||||
DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'main_version $(VERSION)' --define 'rpm_version $(RPM_VERSION)'
|
||||
DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'debian 1'
|
||||
|
||||
rpms: clean server
|
||||
|
||||
#Create proper environment for making rpms
|
||||
prepare:
|
||||
mkdir -p $(TOP_DIR)/BUILD
|
||||
mkdir -p $(TOP_DIR)/SOURCES
|
||||
mkdir -p $(TOP_DIR)/SPECS
|
||||
mkdir -p $(TOP_DIR)/SRPMS
|
||||
mkdir -p $(TOP_DIR)/RPMS
|
||||
mkdir -p $(TOP_DIR)/tmp
|
||||
cp $(TOP_DIR)/$(TARBALL) $(TOP_DIR)/SOURCES
|
||||
cp $(TOP_DIR)/rabbitmq-server.spec $(TOP_DIR)/SPECS
|
||||
cp $(TOP_DIR)/init.d $(TOP_DIR)/BUILD
|
||||
cp $(TOP_DIR)/rabbitmqctl_wrapper $(TOP_DIR)/BUILD
|
||||
cp $(TOP_DIR)/rabbitmq-server.logrotate $(TOP_DIR)/BUILD
|
||||
mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp
|
||||
cp $(TOP_DIR)/$(TARBALL) SOURCES
|
||||
cp rabbitmq-server.spec SPECS
|
||||
sed -i 's/%%VERSION%%/$(VERSION)/' SPECS/rabbitmq-server.spec
|
||||
|
||||
cp init.d SOURCES/rabbitmq-server.init
|
||||
cp rabbitmqctl_wrapper SOURCES/rabbitmq-server.wrapper
|
||||
cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate
|
||||
|
||||
server: prepare
|
||||
rpmbuild -ba $(TOP_DIR)/SPECS/rabbitmq-server.spec $(DEFINES) --target noarch
|
||||
rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --target noarch
|
||||
|
||||
clean:
|
||||
rm -rf $(TOP_DIR)/SOURCES/
|
||||
rm -rf $(TOP_DIR)/SPECS/
|
||||
rm -rf $(TOP_DIR)/RPMS/
|
||||
rm -rf $(TOP_DIR)/SRPMS/
|
||||
rm -rf $(TOP_DIR)/BUILD/
|
||||
rm -rf $(TOP_DIR)/tmp/
|
||||
rm -rf SOURCES SPECS RPMS SRPMS BUILD tmp
|
||||
|
|
|
|||
|
|
@ -9,4 +9,4 @@
|
|||
postrotate
|
||||
/sbin/service rabbitmq-server rotate-logs
|
||||
endscript
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,14 +1,21 @@
|
|||
Name: rabbitmq-server
|
||||
Version: %{rpm_version}
|
||||
Version: %%VERSION%%
|
||||
Release: 1
|
||||
License: MPLv1.1
|
||||
Group: Development/Libraries
|
||||
Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{main_version}/%{name}-%{main_version}.tar.gz
|
||||
Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.gz
|
||||
Source1: rabbitmq-server.init
|
||||
Source2: rabbitmq-server.wrapper
|
||||
Source3: rabbitmq-server.logrotate
|
||||
URL: http://www.rabbitmq.com/
|
||||
Vendor: LShift Ltd., Cohesive Financial Technologies LLC., Rabbit Technlogies Ltd.
|
||||
%if 0%{?debian}
|
||||
%else
|
||||
BuildRequires: python, python-json
|
||||
%endif
|
||||
Requires: erlang, logrotate
|
||||
Packager: Hubert Plociniczak <hubert@lshift.net>
|
||||
BuildRoot: %{_tmppath}/%{name}-%{main_version}-%{release}-root
|
||||
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root
|
||||
Summary: The RabbitMQ server
|
||||
Requires(post): chkconfig
|
||||
Requires(pre): chkconfig initscripts
|
||||
|
|
@ -19,10 +26,8 @@ performance enterprise messaging. The RabbitMQ server is a robust and
|
|||
scalable implementation of an AMQP broker.
|
||||
|
||||
|
||||
%define _mandir /usr/share/man
|
||||
%define _sbindir /usr/sbin
|
||||
%define _libdir %(erl -noshell -eval "io:format('~s~n', [code:lib_dir()]), halt().")
|
||||
%define _maindir %{buildroot}%{_libdir}/rabbitmq_server-%{main_version}
|
||||
%define _erllibdir %(erl -noshell -eval "io:format('~s~n', [code:lib_dir()]), halt().")
|
||||
%define _maindir %{buildroot}%{_erllibdir}/rabbitmq_server-%{version}
|
||||
|
||||
|
||||
%pre
|
||||
|
|
@ -33,7 +38,7 @@ if [ $1 -gt 1 ]; then
|
|||
fi
|
||||
|
||||
%prep
|
||||
%setup -n %{name}-%{main_version}
|
||||
%setup -n %{name}-%{version}
|
||||
|
||||
%build
|
||||
make
|
||||
|
|
@ -44,24 +49,23 @@ rm -rf %{buildroot}
|
|||
make install TARGET_DIR=%{_maindir} \
|
||||
SBIN_DIR=%{buildroot}%{_sbindir} \
|
||||
MAN_DIR=%{buildroot}%{_mandir}
|
||||
VERSION=%{main_version}
|
||||
VERSION=%{version}
|
||||
|
||||
mkdir -p %{buildroot}/var/lib/rabbitmq/mnesia
|
||||
mkdir -p %{buildroot}/var/log/rabbitmq
|
||||
mkdir -p %{buildroot}/etc/rc.d/init.d/
|
||||
|
||||
#Copy all necessary lib files etc.
|
||||
cp ../init.d %{buildroot}/etc/rc.d/init.d/rabbitmq-server
|
||||
install -m 0755 %SOURCE1 %{buildroot}/etc/rc.d/init.d/rabbitmq-server
|
||||
chmod 0755 %{buildroot}/etc/rc.d/init.d/rabbitmq-server
|
||||
|
||||
mv %{buildroot}/usr/sbin/rabbitmqctl %{buildroot}/usr/sbin/rabbitmqctl_real
|
||||
cp ../rabbitmqctl_wrapper %{buildroot}/usr/sbin/rabbitmqctl
|
||||
chmod 0755 %{buildroot}/usr/sbin/rabbitmqctl
|
||||
install -m 0755 %SOURCE2 %{buildroot}/usr/sbin/rabbitmqctl
|
||||
|
||||
cp %{buildroot}%{_mandir}/man1/rabbitmqctl.1.gz %{buildroot}%{_mandir}/man1/rabbitmqctl_real.1.gz
|
||||
|
||||
mkdir -p %{buildroot}/etc/logrotate.d
|
||||
cp ../rabbitmq-server.logrotate %{buildroot}/etc/logrotate.d/rabbitmq-server
|
||||
install %SOURCE3 %{buildroot}/etc/logrotate.d/rabbitmq-server
|
||||
|
||||
%post
|
||||
# create rabbitmq group
|
||||
|
|
@ -93,7 +97,7 @@ fi
|
|||
|
||||
%files
|
||||
%defattr(-,root,root,-)
|
||||
%{_libdir}/rabbitmq_server-%{main_version}/
|
||||
%{_erllibdir}/rabbitmq_server-%{version}/
|
||||
%{_mandir}/man1/rabbitmq-multi.1.gz
|
||||
%{_mandir}/man1/rabbitmq-server.1.gz
|
||||
%{_mandir}/man1/rabbitmqctl.1.gz
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
TARBALL_DIR=../../../dist
|
||||
TARBALL=$(shell (cd $(TARBALL_DIR); echo rabbitmq-server-[0-9]*.tar.gz))
|
||||
DEBIAN_ORIG_TARBALL=$(shell echo $(TARBALL) | sed -e 's:\(.*\)-\(.*\)\(\.tar\.gz\):\1_\2\.orig\3:g')
|
||||
VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g')
|
||||
UNPACKED_DIR=rabbitmq-server-$(VERSION)
|
||||
PACKAGENAME=rabbitmq-server
|
||||
|
|
@ -16,7 +17,8 @@ all:
|
|||
|
||||
package: clean
|
||||
make -C ../.. check_tools
|
||||
tar -zxvf $(TARBALL_DIR)/$(TARBALL)
|
||||
cp $(TARBALL_DIR)/$(TARBALL) $(DEBIAN_ORIG_TARBALL)
|
||||
tar -zxvf $(DEBIAN_ORIG_TARBALL)
|
||||
cp -r debian $(UNPACKED_DIR)
|
||||
chmod a+x $(UNPACKED_DIR)/debian/rules
|
||||
UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR)
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ Source: rabbitmq-server
|
|||
Section: net
|
||||
Priority: extra
|
||||
Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com>
|
||||
Build-Depends: cdbs, debhelper (>= 5), erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, erlang-src, make, python
|
||||
Build-Depends: cdbs, debhelper (>= 5), erlang-nox, erlang-dev, python-json
|
||||
Standards-Version: 3.7.2
|
||||
|
||||
Package: rabbitmq-server
|
||||
|
|
|
|||
|
|
@ -9,4 +9,4 @@
|
|||
postrotate
|
||||
/etc/init.d/rabbitmq-server rotate-logs
|
||||
endscript
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -66,8 +66,10 @@ erl \
|
|||
-sasl sasl_error_logger '{file,"'${SASL_LOGS}'"}' \
|
||||
-os_mon start_cpu_sup true \
|
||||
-os_mon start_disksup false \
|
||||
-os_mon start_memsup false \
|
||||
-os_mon start_memsup true \
|
||||
-os_mon start_os_sup false \
|
||||
-os_mon memsup_system_only true \
|
||||
-os_mon system_memory_high_watermark 0.95 \
|
||||
-mnesia dir "\"${MNESIA_DIR}\"" \
|
||||
${CLUSTER_CONFIG} \
|
||||
${RABBIT_ARGS} \
|
||||
|
|
|
|||
|
|
@ -107,8 +107,10 @@ set MNESIA_DIR=%MNESIA_BASE%/%NODENAME%-mnesia
|
|||
-sasl sasl_error_logger {file,\""%LOG_BASE%/%NODENAME%-sasl.log"\"} ^
|
||||
-os_mon start_cpu_sup true ^
|
||||
-os_mon start_disksup false ^
|
||||
-os_mon start_memsup false ^
|
||||
-os_mon start_memsup true ^
|
||||
-os_mon start_os_sup false ^
|
||||
-os_mon memsup_system_only true ^
|
||||
-os_mon system_memory_high_watermark 0.95 ^
|
||||
-mnesia dir \""%MNESIA_DIR%"\" ^
|
||||
%CLUSTER_CONFIG% ^
|
||||
%RABBIT_ARGS% ^
|
||||
|
|
|
|||
|
|
@ -32,6 +32,8 @@
|
|||
-export([mainloop/4, drain/2]).
|
||||
-export([proxy_loop/3]).
|
||||
|
||||
-define(HIBERNATE_AFTER, 5000).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
start_link(M, A) ->
|
||||
|
|
@ -40,7 +42,8 @@ start_link(M, A) ->
|
|||
ProxyPid = self(),
|
||||
Ref = make_ref(),
|
||||
Pid = spawn_link(
|
||||
fun () -> mainloop(ProxyPid, Ref, M,
|
||||
fun () -> ProxyPid ! Ref,
|
||||
mainloop(ProxyPid, Ref, M,
|
||||
M:init(ProxyPid, A)) end),
|
||||
proxy_loop(Ref, Pid, empty)
|
||||
end).
|
||||
|
|
@ -48,14 +51,19 @@ start_link(M, A) ->
|
|||
%%----------------------------------------------------------------------------
|
||||
|
||||
mainloop(ProxyPid, Ref, M, State) ->
|
||||
ProxyPid ! Ref,
|
||||
NewState =
|
||||
receive
|
||||
{Ref, Messages} ->
|
||||
lists:foldl(fun (Msg, S) ->
|
||||
drain(M, M:handle_message(Msg, S))
|
||||
end, State, lists:reverse(Messages));
|
||||
NewSt =
|
||||
lists:foldl(fun (Msg, S) ->
|
||||
drain(M, M:handle_message(Msg, S))
|
||||
end, State, lists:reverse(Messages)),
|
||||
ProxyPid ! Ref,
|
||||
NewSt;
|
||||
Msg -> M:handle_message(Msg, State)
|
||||
after ?HIBERNATE_AFTER ->
|
||||
erlang:hibernate(?MODULE, mainloop,
|
||||
[ProxyPid, Ref, M, State])
|
||||
end,
|
||||
?MODULE:mainloop(ProxyPid, Ref, M, NewState).
|
||||
|
||||
|
|
@ -89,4 +97,6 @@ proxy_loop(Ref, Pid, State) ->
|
|||
waiting -> Pid ! {Ref, [Msg]}, empty;
|
||||
Messages -> [Msg | Messages]
|
||||
end)
|
||||
after ?HIBERNATE_AFTER ->
|
||||
erlang:hibernate(?MODULE, proxy_loop, [Ref, Pid, State])
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -157,6 +157,8 @@ start(normal, []) ->
|
|||
|
||||
ok = rabbit_amqqueue:start(),
|
||||
|
||||
ok = rabbit_alarm:start(),
|
||||
|
||||
ok = rabbit_binary_generator:
|
||||
check_empty_content_body_frame_size(),
|
||||
|
||||
|
|
@ -198,6 +200,7 @@ start(normal, []) ->
|
|||
|
||||
stop(_State) ->
|
||||
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
|
||||
ok = rabbit_alarm:stop(),
|
||||
ok.
|
||||
|
||||
%---------------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -0,0 +1,125 @@
|
|||
%% The contents of this file are subject to the Mozilla Public License
|
||||
%% Version 1.1 (the "License"); you may not use this file except in
|
||||
%% compliance with the License. You may obtain a copy of the License at
|
||||
%% http://www.mozilla.org/MPL/
|
||||
%%
|
||||
%% Software distributed under the License is distributed on an "AS IS"
|
||||
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
|
||||
%% License for the specific language governing rights and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%% The Original Code is RabbitMQ.
|
||||
%%
|
||||
%% The Initial Developers of the Original Code are LShift Ltd.,
|
||||
%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
|
||||
%%
|
||||
%% Portions created by LShift Ltd., Cohesive Financial Technologies
|
||||
%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008
|
||||
%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
|
||||
%% Technologies Ltd.;
|
||||
%%
|
||||
%% All Rights Reserved.
|
||||
%%
|
||||
%% Contributor(s): ______________________________________.
|
||||
%%
|
||||
|
||||
-module(rabbit_alarm).
|
||||
|
||||
-behaviour(gen_event).
|
||||
|
||||
-export([start/0, stop/0, register/2]).
|
||||
|
||||
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
|
||||
-define(MEMSUP_CHECK_INTERVAL, 1000).
|
||||
|
||||
-record(alarms, {alertees, system_memory_high_watermark = false}).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
-ifdef(use_specs).
|
||||
|
||||
-spec(start/0 :: () -> 'ok').
|
||||
-spec(stop/0 :: () -> 'ok').
|
||||
-spec(register/2 :: (pid(), mfa()) -> 'ok').
|
||||
|
||||
-endif.
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
start() ->
|
||||
%% The default memsup check interval is 1 minute, which is way too
|
||||
%% long - rabbit can gobble up all memory in a matter of
|
||||
%% seconds. Unfortunately the memory_check_interval configuration
|
||||
%% parameter and memsup:set_check_interval/1 function only provide
|
||||
%% a granularity of minutes. So we have to peel off one layer of
|
||||
%% the API to get to the underlying layer which operates at the
|
||||
%% granularity of milliseconds.
|
||||
%%
|
||||
%% Note that the new setting will only take effect after the first
|
||||
%% check has completed, i.e. after one minute. So if rabbit eats
|
||||
%% all the memory within the first minute after startup then we
|
||||
%% are out of luck.
|
||||
ok = os_mon:call(memsup, {set_check_interval, ?MEMSUP_CHECK_INTERVAL},
|
||||
infinity),
|
||||
|
||||
ok = alarm_handler:add_alarm_handler(?MODULE).
|
||||
|
||||
stop() ->
|
||||
ok = alarm_handler:delete_alarm_handler(?MODULE).
|
||||
|
||||
register(Pid, HighMemMFA) ->
|
||||
ok = gen_event:call(alarm_handler, ?MODULE,
|
||||
{register, Pid, HighMemMFA}).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
{ok, #alarms{alertees = dict:new()}}.
|
||||
|
||||
handle_call({register, Pid, HighMemMFA},
|
||||
State = #alarms{alertees = Alertess}) ->
|
||||
_MRef = erlang:monitor(process, Pid),
|
||||
case State#alarms.system_memory_high_watermark of
|
||||
true -> {M, F, A} = HighMemMFA,
|
||||
ok = erlang:apply(M, F, A ++ [Pid, true]);
|
||||
false -> ok
|
||||
end,
|
||||
NewAlertees = dict:store(Pid, HighMemMFA, Alertess),
|
||||
{ok, ok, State#alarms{alertees = NewAlertees}};
|
||||
|
||||
handle_call(_Request, State) ->
|
||||
{ok, not_understood, State}.
|
||||
|
||||
handle_event({set_alarm, {system_memory_high_watermark, []}}, State) ->
|
||||
ok = alert(true, State#alarms.alertees),
|
||||
{ok, State#alarms{system_memory_high_watermark = true}};
|
||||
|
||||
handle_event({clear_alarm, system_memory_high_watermark}, State) ->
|
||||
ok = alert(false, State#alarms.alertees),
|
||||
{ok, State#alarms{system_memory_high_watermark = false}};
|
||||
|
||||
handle_event(_Event, State) ->
|
||||
{ok, State}.
|
||||
|
||||
handle_info({'DOWN', _MRef, process, Pid, _Reason},
|
||||
State = #alarms{alertees = Alertess}) ->
|
||||
{ok, State#alarms{alertees = dict:erase(Pid, Alertess)}};
|
||||
|
||||
handle_info(_Info, State) ->
|
||||
{ok, State}.
|
||||
|
||||
terminate(_Arg, _State) ->
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
alert(Alert, Alertees) ->
|
||||
dict:fold(fun (Pid, {M, F, A}, Acc) ->
|
||||
ok = erlang:apply(M, F, A ++ [Pid, Alert]),
|
||||
Acc
|
||||
end, ok, Alertees).
|
||||
|
|
@ -213,25 +213,23 @@ ack(QPid, Txn, MsgIds, ChPid) ->
|
|||
commit_all(QPids, Txn) ->
|
||||
Timeout = length(QPids) * ?CALL_TIMEOUT,
|
||||
safe_pmap_ok(
|
||||
fun (QPid) -> exit({queue_disappeared, QPid}) end,
|
||||
fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end,
|
||||
QPids).
|
||||
|
||||
rollback_all(QPids, Txn) ->
|
||||
safe_pmap_ok(
|
||||
fun (QPid) -> exit({queue_disappeared, QPid}) end,
|
||||
fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end,
|
||||
QPids).
|
||||
|
||||
notify_down_all(QPids, ChPid) ->
|
||||
Timeout = length(QPids) * ?CALL_TIMEOUT,
|
||||
safe_pmap_ok(
|
||||
fun (QPid) ->
|
||||
rabbit_misc:with_exit_handler(
|
||||
%% we don't care if the queue process has terminated
|
||||
%% in the meantime
|
||||
fun () -> ok end,
|
||||
fun () -> gen_server:call(QPid, {notify_down, ChPid},
|
||||
Timeout) end)
|
||||
end,
|
||||
%% we don't care if the queue process has terminated in the
|
||||
%% meantime
|
||||
fun (_) -> ok end,
|
||||
fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end,
|
||||
QPids).
|
||||
|
||||
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
|
||||
|
|
@ -286,10 +284,13 @@ pseudo_queue(QueueName, Pid) ->
|
|||
arguments = [],
|
||||
pid = Pid}.
|
||||
|
||||
safe_pmap_ok(F, L) ->
|
||||
safe_pmap_ok(H, F, L) ->
|
||||
case [R || R <- rabbit_misc:upmap(
|
||||
fun (V) ->
|
||||
try F(V)
|
||||
try
|
||||
rabbit_misc:with_exit_handler(
|
||||
fun () -> H(V) end,
|
||||
fun () -> F(V) end)
|
||||
catch Class:Reason -> {Class, Reason}
|
||||
end
|
||||
end, L),
|
||||
|
|
@ -297,4 +298,3 @@ safe_pmap_ok(F, L) ->
|
|||
[] -> ok;
|
||||
Errors -> {error, Errors}
|
||||
end.
|
||||
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@
|
|||
-behaviour(gen_server).
|
||||
|
||||
-define(UNSENT_MESSAGE_LIMIT, 100).
|
||||
-define(HIBERNATE_AFTER, 1000).
|
||||
|
||||
-export([start_link/1]).
|
||||
|
||||
|
|
@ -75,7 +76,7 @@ init(Q) ->
|
|||
has_had_consumers = false,
|
||||
next_msg_id = 1,
|
||||
message_buffer = queue:new(),
|
||||
round_robin = queue:new()}}.
|
||||
round_robin = queue:new()}, ?HIBERNATE_AFTER}.
|
||||
|
||||
terminate(_Reason, State) ->
|
||||
%% FIXME: How do we cancel active subscriptions?
|
||||
|
|
@ -90,6 +91,10 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}.
|
||||
|
||||
noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
|
||||
|
||||
lookup_ch(ChPid) ->
|
||||
case get({ch, ChPid}) of
|
||||
undefined -> not_found;
|
||||
|
|
@ -254,7 +259,7 @@ check_auto_delete(State = #q{round_robin = RoundRobin}) ->
|
|||
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
|
||||
round_robin = ActiveConsumers}) ->
|
||||
case lookup_ch(DownPid) of
|
||||
not_found -> {noreply, State};
|
||||
not_found -> noreply(State);
|
||||
#cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} ->
|
||||
NewActive = block_consumers(ChPid, ActiveConsumers),
|
||||
erlang:demonitor(MonitorRef),
|
||||
|
|
@ -270,7 +275,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
|
|||
end,
|
||||
round_robin = NewActive})) of
|
||||
{continue, NewState} ->
|
||||
{noreply, NewState};
|
||||
noreply(NewState);
|
||||
{stop, NewState} ->
|
||||
{stop, normal, NewState}
|
||||
end
|
||||
|
|
@ -470,12 +475,12 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) ->
|
|||
%% queues discarding the message?
|
||||
%%
|
||||
{Delivered, NewState} = attempt_delivery(Txn, Message, State),
|
||||
{reply, Delivered, NewState};
|
||||
reply(Delivered, NewState);
|
||||
|
||||
handle_call({deliver, Txn, Message}, _From, State) ->
|
||||
%% Synchronous, "mandatory" delivery mode
|
||||
{Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
|
||||
{reply, Delivered, NewState};
|
||||
reply(Delivered, NewState);
|
||||
|
||||
handle_call({commit, Txn}, From, State) ->
|
||||
ok = commit_work(Txn, qname(State)),
|
||||
|
|
@ -483,7 +488,7 @@ handle_call({commit, Txn}, From, State) ->
|
|||
gen_server:reply(From, ok),
|
||||
NewState = process_pending(Txn, State),
|
||||
erase_tx(Txn),
|
||||
{noreply, NewState};
|
||||
noreply(NewState);
|
||||
|
||||
handle_call({notify_down, ChPid}, From, State) ->
|
||||
%% optimisation: we reply straight away so the sender can continue
|
||||
|
|
@ -507,10 +512,11 @@ handle_call({basic_get, ChPid, NoAck}, _From,
|
|||
persist_auto_ack(QName, Message)
|
||||
end,
|
||||
Msg = {QName, self(), NextId, Delivered, Message},
|
||||
{reply, {ok, queue:len(BufferTail), Msg},
|
||||
State#q{message_buffer = BufferTail, next_msg_id = NextId + 1}};
|
||||
reply({ok, queue:len(BufferTail), Msg},
|
||||
State#q{message_buffer = BufferTail,
|
||||
next_msg_id = NextId + 1});
|
||||
{empty, _} ->
|
||||
{reply, empty, State}
|
||||
reply(empty, State)
|
||||
end;
|
||||
|
||||
handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
|
||||
|
|
@ -520,11 +526,11 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
|
|||
round_robin = RoundRobin}) ->
|
||||
case check_queue_owner(Owner, ReaderPid) of
|
||||
mismatch ->
|
||||
{reply, {error, queue_owned_by_another_connection}, State};
|
||||
reply({error, queue_owned_by_another_connection}, State);
|
||||
ok ->
|
||||
case check_exclusive_access(ExistingHolder, ExclusiveConsume) of
|
||||
in_use ->
|
||||
{reply, {error, exclusive_consume_unavailable}, State};
|
||||
reply({error, exclusive_consume_unavailable}, State);
|
||||
ok ->
|
||||
C = #cr{consumers = Consumers} = ch_record(ChPid),
|
||||
Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
|
||||
|
|
@ -538,7 +544,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
|
|||
end,
|
||||
round_robin = queue:in({ChPid, Consumer}, RoundRobin)},
|
||||
ok = maybe_send_reply(ChPid, OkMsg),
|
||||
{reply, ok, run_poke_burst(State1)}
|
||||
reply(ok, run_poke_burst(State1))
|
||||
end
|
||||
end;
|
||||
|
||||
|
|
@ -548,7 +554,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
|
|||
case lookup_ch(ChPid) of
|
||||
not_found ->
|
||||
ok = maybe_send_reply(ChPid, OkMsg),
|
||||
{reply, ok, State};
|
||||
reply(ok, State);
|
||||
C = #cr{consumers = Consumers} ->
|
||||
NewConsumers = lists:filter
|
||||
(fun (#consumer{tag = CT}) -> CT /= ConsumerTag end,
|
||||
|
|
@ -564,7 +570,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
|
|||
ConsumerTag,
|
||||
RoundRobin)}) of
|
||||
{continue, State1} ->
|
||||
{reply, ok, State1};
|
||||
reply(ok, State1);
|
||||
{stop, State1} ->
|
||||
{stop, normal, ok, State1}
|
||||
end
|
||||
|
|
@ -573,7 +579,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
|
|||
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
|
||||
message_buffer = MessageBuffer,
|
||||
round_robin = RoundRobin}) ->
|
||||
{reply, {ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State};
|
||||
reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State);
|
||||
|
||||
handle_call({delete, IfUnused, IfEmpty}, _From,
|
||||
State = #q{message_buffer = MessageBuffer}) ->
|
||||
|
|
@ -581,16 +587,17 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
|
|||
IsUnused = is_unused(),
|
||||
if
|
||||
IfEmpty and not(IsEmpty) ->
|
||||
{reply, {error, not_empty}, State};
|
||||
reply({error, not_empty}, State);
|
||||
IfUnused and not(IsUnused) ->
|
||||
{reply, {error, in_use}, State};
|
||||
reply({error, in_use}, State);
|
||||
true ->
|
||||
{stop, normal, {ok, queue:len(MessageBuffer)}, State}
|
||||
end;
|
||||
|
||||
handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) ->
|
||||
ok = purge_message_buffer(qname(State), MessageBuffer),
|
||||
{reply, {ok, queue:len(MessageBuffer)}, State#q{message_buffer = queue:new()}};
|
||||
reply({ok, queue:len(MessageBuffer)},
|
||||
State#q{message_buffer = queue:new()});
|
||||
|
||||
handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
|
||||
exclusive_consumer = Holder}) ->
|
||||
|
|
@ -604,25 +611,25 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
|
|||
%% to check, we'd need to hold not just the ch
|
||||
%% pid for each consumer, but also its reader
|
||||
%% pid...
|
||||
{reply, locked, State};
|
||||
reply(locked, State);
|
||||
ok ->
|
||||
{reply, ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}}
|
||||
reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}})
|
||||
end;
|
||||
{ReaderPid, _MonitorRef} ->
|
||||
{reply, ok, State};
|
||||
reply(ok, State);
|
||||
_ ->
|
||||
{reply, locked, State}
|
||||
reply(locked, State)
|
||||
end.
|
||||
|
||||
handle_cast({deliver, Txn, Message}, State) ->
|
||||
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
|
||||
{_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
|
||||
{noreply, NewState};
|
||||
noreply(NewState);
|
||||
|
||||
handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
|
||||
case lookup_ch(ChPid) of
|
||||
not_found ->
|
||||
{noreply, State};
|
||||
noreply(State);
|
||||
C = #cr{unacked_messages = UAM} ->
|
||||
{Acked, Remaining} = collect_messages(MsgIds, UAM),
|
||||
persist_acks(Txn, qname(State), Acked),
|
||||
|
|
@ -632,37 +639,37 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
|
|||
_ ->
|
||||
record_pending_acks(Txn, ChPid, MsgIds)
|
||||
end,
|
||||
{noreply, State}
|
||||
noreply(State)
|
||||
end;
|
||||
|
||||
handle_cast({rollback, Txn}, State) ->
|
||||
ok = rollback_work(Txn, qname(State)),
|
||||
erase_tx(Txn),
|
||||
{noreply, State};
|
||||
noreply(State);
|
||||
|
||||
handle_cast({redeliver, Messages}, State) ->
|
||||
{noreply, deliver_or_enqueue_n(Messages, State)};
|
||||
noreply(deliver_or_enqueue_n(Messages, State));
|
||||
|
||||
handle_cast({requeue, MsgIds, ChPid}, State) ->
|
||||
case lookup_ch(ChPid) of
|
||||
not_found ->
|
||||
rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
|
||||
[ChPid]),
|
||||
{noreply, State};
|
||||
noreply(State);
|
||||
C = #cr{unacked_messages = UAM} ->
|
||||
{Messages, NewUAM} = collect_messages(MsgIds, UAM),
|
||||
store_ch_record(C#cr{unacked_messages = NewUAM}),
|
||||
{noreply, deliver_or_enqueue_n(
|
||||
[{Message, true} || Message <- Messages], State)}
|
||||
noreply(deliver_or_enqueue_n(
|
||||
[{Message, true} || Message <- Messages], State))
|
||||
end;
|
||||
|
||||
handle_cast({notify_sent, ChPid}, State) ->
|
||||
case lookup_ch(ChPid) of
|
||||
not_found -> {noreply, State};
|
||||
not_found -> noreply(State);
|
||||
T = #cr{unsent_message_count =Count} ->
|
||||
{noreply, possibly_unblock(
|
||||
T#cr{unsent_message_count = Count - 1},
|
||||
State)}
|
||||
noreply(possibly_unblock(
|
||||
T#cr{unsent_message_count = Count - 1},
|
||||
State))
|
||||
end.
|
||||
|
||||
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
|
||||
|
|
@ -681,6 +688,9 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
|
|||
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
|
||||
handle_ch_down(DownPid, State);
|
||||
|
||||
handle_info(timeout, State) ->
|
||||
{noreply, State, hibernate};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?LOGDEBUG("Info in queue: ~p~n", [Info]),
|
||||
{stop, {unhandled_info, Info}, State}.
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@
|
|||
-include("rabbit.hrl").
|
||||
|
||||
-export([start_link/4, do/2, do/3, shutdown/1]).
|
||||
-export([send_command/2, deliver/4]).
|
||||
-export([send_command/2, deliver/4, conserve_memory/2]).
|
||||
|
||||
%% callbacks
|
||||
-export([init/2, handle_message/2]).
|
||||
|
|
@ -49,6 +49,7 @@
|
|||
-spec(shutdown/1 :: (pid()) -> 'ok').
|
||||
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
|
||||
-spec(deliver/4 :: (pid(), ctag(), bool(), msg()) -> 'ok').
|
||||
-spec(conserve_memory/2 :: (pid(), bool()) -> 'ok').
|
||||
|
||||
-endif.
|
||||
|
||||
|
|
@ -77,11 +78,18 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
|
|||
Pid ! {deliver, ConsumerTag, AckRequired, Msg},
|
||||
ok.
|
||||
|
||||
conserve_memory(Pid, Conserve) ->
|
||||
Pid ! {conserve_memory, Conserve},
|
||||
ok.
|
||||
|
||||
%%---------------------------------------------------------------------------
|
||||
|
||||
init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
|
||||
process_flag(trap_exit, true),
|
||||
link(WriterPid),
|
||||
%% this is bypassing the proxy so alarms can "jump the queue" and
|
||||
%% be handled promptly
|
||||
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
|
||||
#ch{state = starting,
|
||||
proxy_pid = ProxyPid,
|
||||
reader_pid = ReaderPid,
|
||||
|
|
@ -129,6 +137,11 @@ handle_message({deliver, ConsumerTag, AckRequired, Msg},
|
|||
true, ConsumerTag, DeliveryTag, Msg),
|
||||
State1#ch{next_tag = DeliveryTag + 1};
|
||||
|
||||
handle_message({conserve_memory, Conserve}, State) ->
|
||||
ok = rabbit_writer:send_command(
|
||||
State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
|
||||
State;
|
||||
|
||||
handle_message({'EXIT', _Pid, Reason}, State) ->
|
||||
terminate(Reason, State);
|
||||
|
||||
|
|
@ -572,29 +585,18 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
|
|||
exchange = ExchangeNameBin,
|
||||
routing_key = RoutingKey,
|
||||
nowait = NoWait,
|
||||
arguments = Arguments},
|
||||
_, State = #ch{ virtual_host = VHostPath }) ->
|
||||
%% FIXME: connection exception (!) on failure?? (see rule named "failure" in spec-XML)
|
||||
%% FIXME: don't allow binding to internal exchanges - including the one named "" !
|
||||
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
|
||||
ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
|
||||
State),
|
||||
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
|
||||
case rabbit_exchange:add_binding(ExchangeName, QueueName,
|
||||
ActualRoutingKey, Arguments) of
|
||||
{error, queue_not_found} ->
|
||||
rabbit_misc:protocol_error(
|
||||
not_found, "no ~s", [rabbit_misc:rs(QueueName)]);
|
||||
{error, exchange_not_found} ->
|
||||
rabbit_misc:protocol_error(
|
||||
not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
|
||||
{error, durability_settings_incompatible} ->
|
||||
rabbit_misc:protocol_error(
|
||||
not_allowed, "durability settings of ~s incompatible with ~s",
|
||||
[rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]);
|
||||
ok ->
|
||||
return_ok(State, NoWait, #'queue.bind_ok'{})
|
||||
end;
|
||||
arguments = Arguments}, _, State) ->
|
||||
binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin,
|
||||
QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
|
||||
NoWait, State);
|
||||
|
||||
handle_method(#'queue.unbind'{queue = QueueNameBin,
|
||||
exchange = ExchangeNameBin,
|
||||
routing_key = RoutingKey,
|
||||
arguments = Arguments}, _, State) ->
|
||||
binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin,
|
||||
QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{},
|
||||
false, State);
|
||||
|
||||
handle_method(#'queue.purge'{queue = QueueNameBin,
|
||||
nowait = NoWait},
|
||||
|
|
@ -630,12 +632,47 @@ handle_method(#'channel.flow'{active = _}, _, State) ->
|
|||
%% FIXME: implement
|
||||
{reply, #'channel.flow_ok'{active = true}, State};
|
||||
|
||||
handle_method(#'channel.flow_ok'{active = _}, _, State) ->
|
||||
%% TODO: We may want to correlate this to channel.flow messages we
|
||||
%% have sent, and complain if we get an unsolicited
|
||||
%% channel.flow_ok, or the client refuses our flow request.
|
||||
{noreply, State};
|
||||
|
||||
handle_method(_MethodRecord, _Content, _State) ->
|
||||
rabbit_misc:protocol_error(
|
||||
command_invalid, "unimplemented method", []).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
|
||||
ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) ->
|
||||
%% FIXME: connection exception (!) on failure??
|
||||
%% (see rule named "failure" in spec-XML)
|
||||
%% FIXME: don't allow binding to internal exchanges -
|
||||
%% including the one named "" !
|
||||
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
|
||||
ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
|
||||
State),
|
||||
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
|
||||
case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
|
||||
{error, queue_not_found} ->
|
||||
rabbit_misc:protocol_error(
|
||||
not_found, "no ~s", [rabbit_misc:rs(QueueName)]);
|
||||
{error, exchange_not_found} ->
|
||||
rabbit_misc:protocol_error(
|
||||
not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
|
||||
{error, binding_not_found} ->
|
||||
rabbit_misc:protocol_error(
|
||||
not_found, "no binding ~s between ~s and ~s",
|
||||
[RoutingKey, rabbit_misc:rs(ExchangeName),
|
||||
rabbit_misc:rs(QueueName)]);
|
||||
{error, durability_settings_incompatible} ->
|
||||
rabbit_misc:protocol_error(
|
||||
not_allowed, "durability settings of ~s incompatible with ~s",
|
||||
[rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]);
|
||||
ok -> return_ok(State, NoWait, ReturnMethod)
|
||||
end.
|
||||
|
||||
publish(Mandatory, Immediate, Message, QPids,
|
||||
State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) ->
|
||||
Handled = deliver(QPids, Mandatory, Immediate, TxnKey,
|
||||
|
|
@ -717,7 +754,8 @@ internal_commit(State = #ch{transaction_id = TxnKey,
|
|||
case rabbit_amqqueue:commit_all(sets:to_list(Participants),
|
||||
TxnKey) of
|
||||
ok -> new_tx(State);
|
||||
{error, Errors} -> exit({commit_failed, Errors})
|
||||
{error, Errors} -> rabbit_misc:protocol_error(
|
||||
internal_error, "commit failed: ~w", [Errors])
|
||||
end.
|
||||
|
||||
internal_rollback(State = #ch{transaction_id = TxnKey,
|
||||
|
|
@ -732,7 +770,8 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
|
|||
TxnKey) of
|
||||
ok -> NewUAMQ = queue:join(UAQ, UAMQ),
|
||||
new_tx(State#ch{unacked_message_q = NewUAMQ});
|
||||
{error, Errors} -> exit({rollback_failed, Errors})
|
||||
{error, Errors} -> rabbit_misc:protocol_error(
|
||||
internal_error, "rollback failed: ~w", [Errors])
|
||||
end.
|
||||
|
||||
fold_per_queue(F, Acc0, UAQ) ->
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@
|
|||
-export([dirty_read/1]).
|
||||
-export([r/3, r/2, rs/1]).
|
||||
-export([enable_cover/0, report_cover/0]).
|
||||
-export([with_exit_handler/2]).
|
||||
-export([throw_on_error/2, with_exit_handler/2]).
|
||||
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
|
||||
-export([execute_mnesia_transaction/1]).
|
||||
-export([ensure_ok/2]).
|
||||
|
|
@ -77,6 +77,8 @@
|
|||
-spec(rs/1 :: (r(atom())) -> string()).
|
||||
-spec(enable_cover/0 :: () -> 'ok' | {'error', any()}).
|
||||
-spec(report_cover/0 :: () -> 'ok').
|
||||
-spec(throw_on_error/2 ::
|
||||
(atom(), thunk({error, any()} | {ok, A} | A)) -> A).
|
||||
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
|
||||
-spec(with_user/2 :: (username(), thunk(A)) -> A).
|
||||
-spec(with_vhost/2 :: (vhost(), thunk(A)) -> A).
|
||||
|
|
@ -198,6 +200,13 @@ report_coverage_percentage(File, Cov, NotCov, Mod) ->
|
|||
end,
|
||||
Mod]).
|
||||
|
||||
throw_on_error(E, Thunk) ->
|
||||
case Thunk() of
|
||||
{error, Reason} -> throw({E, Reason});
|
||||
{ok, Res} -> Res;
|
||||
Res -> Res
|
||||
end.
|
||||
|
||||
with_exit_handler(Handler, Thunk) ->
|
||||
try
|
||||
Thunk()
|
||||
|
|
|
|||
|
|
@ -166,14 +166,27 @@ teardown_profiling(Value) ->
|
|||
fprof:analyse([{dest, []}, {cols, 100}])
|
||||
end.
|
||||
|
||||
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
|
||||
|
||||
peername(Sock) ->
|
||||
try
|
||||
{Address, Port} = inet_op(fun () -> inet:peername(Sock) end),
|
||||
AddressS = inet_parse:ntoa(Address),
|
||||
{AddressS, Port}
|
||||
catch
|
||||
Ex -> rabbit_log:error("error on TCP connection ~p:~p~n",
|
||||
[self(), Ex]),
|
||||
rabbit_log:info("closing TCP connection ~p", [self()]),
|
||||
exit(normal)
|
||||
end.
|
||||
|
||||
start_connection(Parent, Deb, ClientSock) ->
|
||||
ProfilingValue = setup_profiling(),
|
||||
process_flag(trap_exit, true),
|
||||
{ok, {PeerAddress, PeerPort}} = inet:peername(ClientSock),
|
||||
PeerAddressS = inet_parse:ntoa(PeerAddress),
|
||||
rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
|
||||
[self(), PeerAddressS, PeerPort]),
|
||||
{PeerAddressS, PeerPort} = peername(ClientSock),
|
||||
ProfilingValue = setup_profiling(),
|
||||
try
|
||||
rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
|
||||
[self(), PeerAddressS, PeerPort]),
|
||||
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
|
||||
handshake_timeout),
|
||||
mainloop(Parent, Deb, switch_callback(
|
||||
|
|
@ -266,7 +279,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
|
|||
end.
|
||||
|
||||
switch_callback(OldState, NewCallback, Length) ->
|
||||
{ok, Ref} = prim_inet:async_recv(OldState#v1.sock, Length, -1),
|
||||
Ref = inet_op(fun () -> prim_inet:async_recv(
|
||||
OldState#v1.sock, Length, -1) end),
|
||||
OldState#v1{callback = NewCallback,
|
||||
recv_ref = Ref}.
|
||||
|
||||
|
|
@ -472,7 +486,10 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
|
|||
end;
|
||||
|
||||
handle_input(handshake, Other, #v1{sock = Sock}) ->
|
||||
ok = gen_tcp:send(Sock, <<"AMQP",1,1,?PROTOCOL_VERSION_MAJOR,?PROTOCOL_VERSION_MINOR>>),
|
||||
ok = inet_op(fun () -> gen_tcp:send(
|
||||
Sock, <<"AMQP",1,1,
|
||||
?PROTOCOL_VERSION_MAJOR,
|
||||
?PROTOCOL_VERSION_MINOR>>) end),
|
||||
throw({bad_header, Other});
|
||||
|
||||
handle_input(Callback, Data, _State) ->
|
||||
|
|
|
|||
|
|
@ -150,11 +150,9 @@ run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) ->
|
|||
fun (QPid, {Routed, Handled}) ->
|
||||
case catch rabbit_amqqueue:deliver(IsMandatory, IsImmediate,
|
||||
Txn, Message, QPid) of
|
||||
true -> {true, [QPid | Handled]};
|
||||
false -> {true, Handled};
|
||||
{'EXIT', Reason} -> rabbit_log:warning("delivery to ~p failed:~n~p~n",
|
||||
[QPid, Reason]),
|
||||
{Routed, Handled}
|
||||
true -> {true, [QPid | Handled]};
|
||||
false -> {true, Handled};
|
||||
{'EXIT', _Reason} -> {Routed, Handled}
|
||||
end
|
||||
end,
|
||||
{false, []},
|
||||
|
|
|
|||
|
|
@ -36,6 +36,8 @@
|
|||
|
||||
-record(wstate, {sock, channel, frame_max}).
|
||||
|
||||
-define(HIBERNATE_AFTER, 5000).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
-ifdef(use_specs).
|
||||
|
|
@ -63,6 +65,8 @@ start(Sock, Channel, FrameMax) ->
|
|||
mainloop(State) ->
|
||||
receive
|
||||
Message -> ?MODULE:mainloop(handle_message(Message, State))
|
||||
after ?HIBERNATE_AFTER ->
|
||||
erlang:hibernate(?MODULE, mainloop, [State])
|
||||
end.
|
||||
|
||||
handle_message({send_command, MethodRecord},
|
||||
|
|
@ -127,12 +131,16 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax) ->
|
|||
Channel, Content, FrameMax),
|
||||
[MethodFrame | ContentFrames].
|
||||
|
||||
tcp_send(Sock, Data) ->
|
||||
rabbit_misc:throw_on_error(inet_error,
|
||||
fun () -> gen_tcp:send(Sock, Data) end).
|
||||
|
||||
internal_send_command(Sock, Channel, MethodRecord) ->
|
||||
ok = gen_tcp:send(Sock, assemble_frames(Channel, MethodRecord)).
|
||||
ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)).
|
||||
|
||||
internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) ->
|
||||
ok = gen_tcp:send(Sock, assemble_frames(Channel, MethodRecord,
|
||||
Content, FrameMax)).
|
||||
ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord,
|
||||
Content, FrameMax)).
|
||||
|
||||
%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
|
||||
%% Status} to obtain the result. That is bad when it is called from
|
||||
|
|
|
|||
Loading…
Reference in New Issue