Merged default into 18776
This commit is contained in:
commit
6fba52a764
|
|
@ -28,7 +28,10 @@
|
|||
[ "x" = "x$NODE_IP_ADDRESS" ] && NODE_IP_ADDRESS=0.0.0.0
|
||||
[ "x" = "x$NODE_PORT" ] && NODE_PORT=5672
|
||||
|
||||
ERL_ARGS="+K true +A30 -kernel inet_default_listen_options [{sndbuf,16384},{recbuf,4096}]"
|
||||
ERL_ARGS="+K true +A30 \
|
||||
-kernel inet_default_listen_options [{nodelay,true},{sndbuf,16384},{recbuf,4096}] \
|
||||
-kernel inet_default_connect_options [{nodelay,true}]"
|
||||
|
||||
CLUSTER_CONFIG_FILE=/etc/default/rabbitmq_cluster.config
|
||||
|
||||
[ "x" = "x$LOG_BASE" ] && LOG_BASE=/var/log/rabbitmq
|
||||
|
|
|
|||
|
|
@ -99,7 +99,8 @@ set MNESIA_DIR=%MNESIA_BASE%/%NODENAME%-mnesia
|
|||
-s rabbit ^
|
||||
+W w ^
|
||||
+A30 ^
|
||||
-kernel inet_default_listen_options "[{sndbuf, 16384}, {recbuf, 4096}]" ^
|
||||
-kernel inet_default_listen_options "[{nodelay, true}, {sndbuf, 16384}, {recbuf, 4096}]" ^
|
||||
-kernel inet_default_connect_options "[{nodelay, true}]" ^
|
||||
-rabbit tcp_listeners "[{\"%NODE_IP_ADDRESS%\", %NODE_PORT%}]" ^
|
||||
-kernel error_logger {file,\""%LOG_BASE%/%NODENAME%.log"\"} ^
|
||||
-sasl errlog_type error ^
|
||||
|
|
|
|||
|
|
@ -28,11 +28,11 @@
|
|||
-export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]).
|
||||
-export([pseudo_queue/2]).
|
||||
-export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1,
|
||||
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4,
|
||||
commit/2, rollback/2]).
|
||||
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
|
||||
-export([claim_queue/2]).
|
||||
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
|
||||
-export([notify_sent/2, notify_down/2]).
|
||||
-export([notify_sent/2]).
|
||||
-export([commit_all/2, rollback_all/2, notify_down_all/2]).
|
||||
-export([on_node_down/1]).
|
||||
|
||||
-import(mnesia).
|
||||
|
|
@ -43,6 +43,8 @@
|
|||
-include("rabbit.hrl").
|
||||
-include_lib("stdlib/include/qlc.hrl").
|
||||
|
||||
-define(CALL_TIMEOUT, 5000).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
-ifdef(use_specs).
|
||||
|
|
@ -50,6 +52,8 @@
|
|||
-type(qstats() :: {'ok', queue_name(), non_neg_integer(), non_neg_integer()}).
|
||||
-type(qlen() :: {'ok', non_neg_integer()}).
|
||||
-type(qfun(A) :: fun ((amqqueue()) -> A)).
|
||||
-type(ok_or_errors() ::
|
||||
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
|
||||
-spec(start/0 :: () -> 'ok').
|
||||
-spec(recover/0 :: () -> 'ok').
|
||||
-spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) ->
|
||||
|
|
@ -72,9 +76,9 @@
|
|||
-spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok').
|
||||
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
|
||||
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
|
||||
-spec(commit/2 :: (pid(), txn()) -> 'ok').
|
||||
-spec(rollback/2 :: (pid(), txn()) -> 'ok').
|
||||
-spec(notify_down/2 :: (amqqueue(), pid()) -> 'ok').
|
||||
-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
|
||||
-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
|
||||
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
|
||||
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
|
||||
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
|
||||
{'ok', non_neg_integer(), msg()} | 'empty').
|
||||
|
|
@ -210,14 +214,29 @@ requeue(QPid, MsgIds, ChPid) ->
|
|||
ack(QPid, Txn, MsgIds, ChPid) ->
|
||||
gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}).
|
||||
|
||||
commit(QPid, Txn) ->
|
||||
gen_server:call(QPid, {commit, Txn}).
|
||||
commit_all(QPids, Txn) ->
|
||||
Timeout = length(QPids) * ?CALL_TIMEOUT,
|
||||
safe_pmap_ok(
|
||||
fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end,
|
||||
QPids).
|
||||
|
||||
rollback(QPid, Txn) ->
|
||||
gen_server:cast(QPid, {rollback, Txn}).
|
||||
rollback_all(QPids, Txn) ->
|
||||
safe_pmap_ok(
|
||||
fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end,
|
||||
QPids).
|
||||
|
||||
notify_down(#amqqueue{ pid = QPid }, ChPid) ->
|
||||
gen_server:call(QPid, {notify_down, ChPid}).
|
||||
notify_down_all(QPids, ChPid) ->
|
||||
Timeout = length(QPids) * ?CALL_TIMEOUT,
|
||||
safe_pmap_ok(
|
||||
fun (QPid) ->
|
||||
rabbit_misc:with_exit_handler(
|
||||
%% we don't care if the queue process has terminated
|
||||
%% in the meantime
|
||||
fun () -> ok end,
|
||||
fun () -> gen_server:call(QPid, {notify_down, ChPid},
|
||||
Timeout) end)
|
||||
end,
|
||||
QPids).
|
||||
|
||||
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
|
||||
gen_server:call(QPid, {claim_queue, ReaderPid}).
|
||||
|
|
@ -270,3 +289,16 @@ pseudo_queue(QueueName, Pid) ->
|
|||
auto_delete = false,
|
||||
arguments = [],
|
||||
pid = Pid}.
|
||||
|
||||
safe_pmap_ok(F, L) ->
|
||||
case [R || R <- rabbit_misc:upmap(
|
||||
fun (V) ->
|
||||
try F(V)
|
||||
catch Class:Reason -> {Class, Reason}
|
||||
end
|
||||
end, L),
|
||||
R =/= ok] of
|
||||
[] -> ok;
|
||||
Errors -> {error, Errors}
|
||||
end.
|
||||
|
||||
|
|
|
|||
|
|
@ -707,21 +707,6 @@ ack(ProxyPid, TxnKey, UAQ) ->
|
|||
|
||||
make_tx_id() -> rabbit_misc:guid().
|
||||
|
||||
safe_pmap_set_ok(F, S) ->
|
||||
case lists:filter(fun (R) -> R =/= ok end,
|
||||
rabbit_misc:upmap(
|
||||
fun (V) ->
|
||||
try F(V)
|
||||
catch Class:Reason -> {Class, Reason}
|
||||
end
|
||||
end, sets:to_list(S))) of
|
||||
[] -> ok;
|
||||
Errors -> {error, Errors}
|
||||
end.
|
||||
|
||||
notify_participants(F, TxnKey, Participants) ->
|
||||
safe_pmap_set_ok(fun (QPid) -> F(QPid, TxnKey) end, Participants).
|
||||
|
||||
new_tx(State) ->
|
||||
State#ch{transaction_id = make_tx_id(),
|
||||
tx_participants = sets:new(),
|
||||
|
|
@ -729,8 +714,8 @@ new_tx(State) ->
|
|||
|
||||
internal_commit(State = #ch{transaction_id = TxnKey,
|
||||
tx_participants = Participants}) ->
|
||||
case notify_participants(fun rabbit_amqqueue:commit/2,
|
||||
TxnKey, Participants) of
|
||||
case rabbit_amqqueue:commit_all(sets:to_list(Participants),
|
||||
TxnKey) of
|
||||
ok -> new_tx(State);
|
||||
{error, Errors} -> exit({commit_failed, Errors})
|
||||
end.
|
||||
|
|
@ -743,8 +728,8 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
|
|||
[self(),
|
||||
queue:len(UAQ),
|
||||
queue:len(UAMQ)]),
|
||||
case notify_participants(fun rabbit_amqqueue:rollback/2,
|
||||
TxnKey, Participants) of
|
||||
case rabbit_amqqueue:rollback_all(sets:to_list(Participants),
|
||||
TxnKey) of
|
||||
ok -> NewUAMQ = queue:join(UAQ, UAMQ),
|
||||
new_tx(State#ch{unacked_message_q = NewUAMQ});
|
||||
{error, Errors} -> exit({rollback_failed, Errors})
|
||||
|
|
@ -767,23 +752,18 @@ fold_per_queue(F, Acc0, UAQ) ->
|
|||
Acc0, D).
|
||||
|
||||
notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) ->
|
||||
safe_pmap_set_ok(
|
||||
fun (QueueName) ->
|
||||
case rabbit_amqqueue:with(
|
||||
QueueName,
|
||||
fun (Q) ->
|
||||
rabbit_amqqueue:notify_down(Q, ProxyPid)
|
||||
end) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, not_found} ->
|
||||
%% queue has been deleted in the meantime
|
||||
ok
|
||||
end
|
||||
end,
|
||||
dict:fold(fun (_ConsumerTag, QueueName, S) ->
|
||||
sets:add_element(QueueName, S)
|
||||
end, sets:new(), Consumers)).
|
||||
rabbit_amqqueue:notify_down_all(
|
||||
[QPid || QueueName <-
|
||||
sets:to_list(
|
||||
dict:fold(fun (_ConsumerTag, QueueName, S) ->
|
||||
sets:add_element(QueueName, S)
|
||||
end, sets:new(), Consumers)),
|
||||
case rabbit_amqqueue:lookup(QueueName) of
|
||||
{ok, Q} -> QPid = Q#amqqueue.pid, true;
|
||||
%% queue has been deleted in the meantime
|
||||
{error, not_found} -> QPid = none, false
|
||||
end],
|
||||
ProxyPid).
|
||||
|
||||
is_message_persistent(#content{properties = #'P_basic'{
|
||||
delivery_mode = Mode}}) ->
|
||||
|
|
|
|||
|
|
@ -51,7 +51,6 @@
|
|||
not_found() | {'error', 'unroutable' | 'not_delivered'}).
|
||||
-type(bind_res() :: 'ok' |
|
||||
{'error', 'queue_not_found' | 'exchange_not_found'}).
|
||||
|
||||
-spec(recover/0 :: () -> 'ok').
|
||||
-spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(),
|
||||
amqp_table()) -> exchange()).
|
||||
|
|
|
|||
|
|
@ -59,6 +59,7 @@
|
|||
%% all states, unless specified otherwise:
|
||||
%% socket error -> *exit*
|
||||
%% socket close -> *throw*
|
||||
%% writer send failure -> *throw*
|
||||
%% forced termination -> *exit*
|
||||
%% handshake_timeout -> *throw*
|
||||
%% pre-init:
|
||||
|
|
@ -93,10 +94,18 @@
|
|||
%% terminate_channel timeout -> remove 'closing' mark, *closing*
|
||||
%% handshake_timeout -> ignore, *closing*
|
||||
%% heartbeat timeout -> *throw*
|
||||
%% channel exit ->
|
||||
%% if abnormal exit then log error
|
||||
%% if last channel to exit then send connection.close_ok, start
|
||||
%% terminate_connection timer, *closing*
|
||||
%% channel exit with hard error
|
||||
%% -> log error, wait for channels to terminate forcefully, start
|
||||
%% terminate_connection timer, send close, *closed*
|
||||
%% channel exit with soft error
|
||||
%% -> log error, start terminate_channel timer, mark channel as
|
||||
%% closing
|
||||
%% if last channel to exit then send connection.close_ok,
|
||||
%% start terminate_connection timer, *closed*
|
||||
%% else *closing*
|
||||
%% channel exits normally
|
||||
%% -> if last channel to exit then send connection.close_ok,
|
||||
%% start terminate_connection timer, *closed*
|
||||
%% closed:
|
||||
%% socket close -> *terminate*
|
||||
%% receive connection.close_ok -> self() ! terminate_connection,
|
||||
|
|
@ -230,6 +239,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
|
|||
%% since this termination is initiated by our parent it is
|
||||
%% probably more important to exit quickly.
|
||||
exit(Reason);
|
||||
{'EXIT', _Pid, E = {writer, send_failed, _Error}} ->
|
||||
throw(E);
|
||||
{'EXIT', Pid, Reason} ->
|
||||
mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State));
|
||||
{terminate_channel, Channel, Ref1} ->
|
||||
|
|
@ -288,24 +299,13 @@ terminate_channel(Channel, Ref, State) ->
|
|||
end,
|
||||
State.
|
||||
|
||||
handle_dependent_exit(Pid, Reason,
|
||||
State = #v1{connection_state = closing}) ->
|
||||
case channel_cleanup(Pid) of
|
||||
undefined -> exit({abnormal_dependent_exit, Pid, Reason});
|
||||
Channel ->
|
||||
case Reason of
|
||||
normal -> ok;
|
||||
_ -> log_channel_error(closing, Channel, Reason)
|
||||
end,
|
||||
maybe_close(State)
|
||||
end;
|
||||
handle_dependent_exit(Pid, normal, State) ->
|
||||
channel_cleanup(Pid),
|
||||
State;
|
||||
maybe_close(State);
|
||||
handle_dependent_exit(Pid, Reason, State) ->
|
||||
case channel_cleanup(Pid) of
|
||||
undefined -> exit({abnormal_dependent_exit, Pid, Reason});
|
||||
Channel -> handle_exception(State, Channel, Reason)
|
||||
Channel -> maybe_close(handle_exception(State, Channel, Reason))
|
||||
end.
|
||||
|
||||
channel_cleanup(Pid) ->
|
||||
|
|
@ -362,13 +362,15 @@ wait_for_channel_termination(N, TimerRef) ->
|
|||
exit(channel_termination_timeout)
|
||||
end.
|
||||
|
||||
maybe_close(State) ->
|
||||
maybe_close(State = #v1{connection_state = closing}) ->
|
||||
case all_channels() of
|
||||
[] -> ok = send_on_channel0(
|
||||
State#v1.sock, #'connection.close_ok'{}),
|
||||
close_connection(State);
|
||||
_ -> State
|
||||
end.
|
||||
end;
|
||||
maybe_close(State) ->
|
||||
State.
|
||||
|
||||
handle_frame(Type, 0, Payload, State = #v1{connection_state = CS})
|
||||
when CS =:= closing; CS =:= closed ->
|
||||
|
|
|
|||
|
|
@ -153,10 +153,15 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) ->
|
|||
%% when these are full. So the fact that we process the result
|
||||
%% asynchronously does not impact flow control.
|
||||
internal_send_command_async(Sock, Channel, MethodRecord) ->
|
||||
true = erlang:port_command(Sock, assemble_frames(Channel, MethodRecord)),
|
||||
true = port_cmd(Sock, assemble_frames(Channel, MethodRecord)),
|
||||
ok.
|
||||
|
||||
internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) ->
|
||||
true = erlang:port_command(Sock, assemble_frames(Channel, MethodRecord,
|
||||
Content, FrameMax)),
|
||||
true = port_cmd(Sock, assemble_frames(Channel, MethodRecord,
|
||||
Content, FrameMax)),
|
||||
ok.
|
||||
|
||||
port_cmd(Sock, Data) ->
|
||||
try erlang:port_command(Sock, Data)
|
||||
catch error:Error -> exit({writer, send_failed, Error})
|
||||
end.
|
||||
|
|
|
|||
Loading…
Reference in New Issue