From eb638c17b4ef2c97ae37962e633584d269700d3d Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 5 Jun 2009 08:10:26 +0100 Subject: [PATCH 1/2] refactoring: bundle up all the data for a publish Passing this around as separate args was becoming a pain. Also, now it's easier to add more data items. --- include/rabbit.hrl | 7 +++++ src/rabbit_amqqueue.erl | 13 ++++---- src/rabbit_basic.erl | 16 ++++++---- src/rabbit_channel.erl | 5 ++-- src/rabbit_error_logger.erl | 10 ++++--- src/rabbit_exchange.erl | 20 +++++-------- src/rabbit_router.erl | 59 ++++++++++++++++--------------------- 7 files changed, 66 insertions(+), 64 deletions(-) diff --git a/include/rabbit.hrl b/include/rabbit.hrl index ffda069840..5ebc82a2be 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -64,6 +64,8 @@ -record(basic_message, {exchange_name, routing_key, content, persistent_key}). +-record(delivery, {mandatory, immediate, txn, message}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -134,6 +136,11 @@ content :: content(), persistent_key :: maybe(pkey())}). -type(message() :: basic_message()). +-type(delivery() :: + #delivery{mandatory :: bool(), + immediate :: bool(), + txn :: maybe(txn()), + message :: message()}). %% this really should be an abstract type -type(msg_id() :: non_neg_integer()). -type(msg() :: {queue_name(), pid(), msg_id(), bool(), message()}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d5f1902652..64f078bd61 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -35,7 +35,7 @@ -export([internal_declare/2, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, - stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). + stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). @@ -85,7 +85,7 @@ {'error', 'in_use'} | {'error', 'not_empty'}). -spec(purge/1 :: (amqqueue()) -> qlen()). --spec(deliver/5 :: (bool(), bool(), maybe(txn()), message(), pid()) -> bool()). +-spec(deliver/2 :: (pid(), delivery()) -> bool()). -spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). @@ -241,12 +241,13 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity). -deliver(_IsMandatory, true, Txn, Message, QPid) -> - gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity); -deliver(true, _IsImmediate, Txn, Message, QPid) -> +deliver(QPid, #delivery{immediate = true, txn = Txn, message = Message}) -> + gen_server2:call(QPid, {deliver_immediately, Txn, Message}, + infinity); +deliver(QPid, #delivery{mandatory = true, txn = Txn, message = Message}) -> gen_server2:call(QPid, {deliver, Txn, Message}, infinity), true; -deliver(false, _IsImmediate, Txn, Message, QPid) -> +deliver(QPid, #delivery{txn = Txn, message = Message}) -> gen_server2:cast(QPid, {deliver, Txn, Message}), true. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index b2e858208e..ce4f818d0a 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -33,14 +33,15 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/4, message/4]). +-export([publish/1, message/4, delivery/4]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(publish/4 :: (bool(), bool(), maybe(txn()), message()) -> +-spec(publish/1 :: (delivery()) -> {ok, routing_result(), [pid()]} | not_found()). +-spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()). -spec(message/4 :: (exchange_name(), routing_key(), binary(), binary()) -> message()). @@ -48,17 +49,20 @@ %%---------------------------------------------------------------------------- -publish(Mandatory, Immediate, Txn, - Message = #basic_message{exchange_name = ExchangeName}) -> +publish(Delivery = #delivery{ + message = #basic_message{exchange_name = ExchangeName}}) -> case rabbit_exchange:lookup(ExchangeName) of {ok, X} -> - {RoutingRes, DeliveredQPids} = - rabbit_exchange:publish(X, Mandatory, Immediate, Txn, Message), + {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery), {ok, RoutingRes, DeliveredQPids}; Other -> Other end. +delivery(Mandatory, Immediate, Txn, Message) -> + #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, + message = Message}. + message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), Content = #content{class_id = ClassId, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 84b414fd7d..3089bb6293 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -324,8 +324,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, content = DecodedContent, persistent_key = PersistentKey}, {RoutingRes, DeliveredQPids} = - rabbit_exchange:publish(Exchange, Mandatory, Immediate, TxnKey, - Message), + rabbit_exchange:publish( + Exchange, + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), case RoutingRes of routed -> ok; diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index d73edb73b8..76016a8cb2 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -75,8 +75,10 @@ publish(_Other, _Format, _Data, _State) -> publish1(RoutingKey, Format, Data, LogExch) -> {ok, _RoutingRes, _DeliveredQPids} = - rabbit_basic:publish(false, false, none, - rabbit_basic:message( - LogExch, RoutingKey, <<"text/plain">>, - list_to_binary(io_lib:format(Format, Data)))), + rabbit_basic:publish( + rabbit_basic:delivery( + false, false, none, + rabbit_basic:message( + LogExch, RoutingKey, <<"text/plain">>, + list_to_binary(io_lib:format(Format, Data))))), ok. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index ca0e337b84..7d9948f06f 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -36,7 +36,7 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, - publish/5]). + publish/2]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). @@ -72,8 +72,7 @@ -spec(info/2 :: (exchange(), [info_key()]) -> [info()]). -spec(info_all/1 :: (vhost()) -> [[info()]]). -spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). --spec(publish/5 :: (exchange(), bool(), bool(), maybe(txn()), message()) -> - {routing_result(), [pid()]}). +-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). -spec(add_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'durability_settings_incompatible'}). @@ -188,13 +187,12 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -publish(X, Mandatory, Immediate, Txn, Message) -> - publish(X, [], Mandatory, Immediate, Txn, Message). +publish(X, Delivery) -> + publish(X, [], Delivery). -publish(X, Seen, Mandatory, Immediate, Txn, - Message = #basic_message{routing_key = RK, content = C}) -> - case rabbit_router:deliver(route(X, RK, C), - Mandatory, Immediate, Txn, Message) of +publish(X, Seen, Delivery = #delivery{ + message = #basic_message{routing_key = RK, content = C}}) -> + case rabbit_router:deliver(route(X, RK, C), Delivery) of {_, []} = R -> #exchange{name = XName, arguments = Args} = X, case rabbit_misc:r_arg(XName, exchange, Args, @@ -209,9 +207,7 @@ publish(X, Seen, Mandatory, Immediate, Txn, false -> case lookup(AName) of {ok, AX} -> - publish(AX, NewSeen, - Mandatory, Immediate, Txn, - Message); + publish(AX, NewSeen, Delivery); {error, not_found} -> rabbit_log:warning( "alternate exchange for ~s " diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 57166428bf..10f80cc301 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). -export([start_link/0, - deliver/5]). + deliver/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -50,8 +50,7 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(deliver/5 :: ([pid()], bool(), bool(), maybe(txn()), message()) -> - {routing_result(), [pid()]}). +-spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}). -endif. @@ -62,13 +61,13 @@ start_link() -> -ifdef(BUG19758). -deliver(QPids, Mandatory, Immediate, Txn, Message) -> - check_delivery(Mandatory, Immediate, - run_bindings(QPids, Mandatory, Immediate, Txn, Message)). +deliver(QPids, Delivery) -> + check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, + run_bindings(QPids, Delivery)). -else. -deliver(QPids, Mandatory, Immediate, Txn, Message) -> +deliver(QPids, Delivery) -> %% we reduce inter-node traffic by grouping the qpids by node and %% only delivering one copy of the message to each node involved, %% which then in turn delivers it to its queues. @@ -81,16 +80,14 @@ deliver(QPids, Mandatory, Immediate, Txn, Message) -> [QPid], D) end, dict:new(), QPids)), - Mandatory, Immediate, Txn, Message). + Delivery). -deliver_per_node([{Node, QPids}], Mandatory, Immediate, - Txn, Message) - when Node == node() -> +deliver_per_node([{Node, QPids}], Delivery) when Node == node() -> %% optimisation - check_delivery(Mandatory, Immediate, - run_bindings(QPids, Mandatory, Immediate, Txn, Message)); -deliver_per_node(NodeQPids, Mandatory = false, Immediate = false, - Txn, Message) -> + check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, + run_bindings(QPids, Delivery)); +deliver_per_node(NodeQPids, Delivery = #delivery{mandatory = false, + immediate = false}) -> %% optimisation: when Mandatory = false and Immediate = false, %% rabbit_amqqueue:deliver in run_bindings below will deliver the %% message to the queue process asynchronously, and return true, @@ -101,20 +98,16 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false, {routed, lists:flatmap( fun ({Node, QPids}) -> - gen_server2:cast( - {?SERVER, Node}, - {deliver, QPids, Mandatory, Immediate, Txn, Message}), + gen_server2:cast({?SERVER, Node}, {deliver, QPids, Delivery}), QPids end, NodeQPids)}; -deliver_per_node(NodeQPids, Mandatory, Immediate, - Txn, Message) -> +deliver_per_node(NodeQPids, Delivery) -> R = rabbit_misc:upmap( fun ({Node, QPids}) -> - try gen_server2:call( - {?SERVER, Node}, - {deliver, QPids, Mandatory, Immediate, Txn, Message}, - infinity) + try gen_server2:call({?SERVER, Node}, + {deliver, QPids, Delivery}, + infinity) catch _Class:_Reason -> %% TODO: figure out what to log (and do!) here @@ -131,7 +124,8 @@ deliver_per_node(NodeQPids, Mandatory, Immediate, end, {false, []}, R), - check_delivery(Mandatory, Immediate, {Routed, lists:append(Handled)}). + check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, + {Routed, lists:append(Handled)}). -endif. @@ -140,19 +134,17 @@ deliver_per_node(NodeQPids, Mandatory, Immediate, init([]) -> {ok, no_state}. -handle_call({deliver, QPids, Mandatory, Immediate, Txn, Message}, - From, State) -> +handle_call({deliver, QPids, Delivery}, From, State) -> spawn( fun () -> - R = run_bindings(QPids, Mandatory, Immediate, Txn, Message), + R = run_bindings(QPids, Delivery), gen_server2:reply(From, R) end), {noreply, State}. -handle_cast({deliver, QPids, Mandatory, Immediate, Txn, Message}, - State) -> +handle_cast({deliver, QPids, Delivery}, State) -> %% in order to preserve message ordering we must not spawn here - run_bindings(QPids, Mandatory, Immediate, Txn, Message), + run_bindings(QPids, Delivery), {noreply, State}. handle_info(_Info, State) -> @@ -166,11 +158,10 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- -run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) -> +run_bindings(QPids, Delivery) -> lists:foldl( fun (QPid, {Routed, Handled}) -> - case catch rabbit_amqqueue:deliver(IsMandatory, IsImmediate, - Txn, Message, QPid) of + case catch rabbit_amqqueue:deliver(QPid, Delivery) of true -> {true, [QPid | Handled]}; false -> {true, Handled}; {'EXIT', _Reason} -> {Routed, Handled} From 1b43c217f415bc0cdd976229a2abcb18a8374c62 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 5 Jun 2009 08:10:51 +0100 Subject: [PATCH 2/2] clean up tx records in queues when a transaction's channel dies Previously queues were only monitoring channels with subscribers or to which ack-requiring messages had been delivered. Now queues also monitor channels from which they have received transactional publishes. Queues record the last tx id they have seen from a channel. This then makes it easy and efficient to find the associated tx record in the queue's process dictionary when a channel process dies - the alternative would be to scan the tx records for matching channel pids - and perform the required rollback activities for the tx. --- include/rabbit.hrl | 3 ++- src/rabbit_amqqueue.erl | 14 +++++----- src/rabbit_amqqueue_process.erl | 47 ++++++++++++++++++++++----------- src/rabbit_basic.erl | 2 +- 4 files changed, 43 insertions(+), 23 deletions(-) diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 5ebc82a2be..784c21b39d 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -64,7 +64,7 @@ -record(basic_message, {exchange_name, routing_key, content, persistent_key}). --record(delivery, {mandatory, immediate, txn, message}). +-record(delivery, {mandatory, immediate, txn, sender, message}). %%---------------------------------------------------------------------------- @@ -140,6 +140,7 @@ #delivery{mandatory :: bool(), immediate :: bool(), txn :: maybe(txn()), + sender :: pid(), message :: message()}). %% this really should be an abstract type -type(msg_id() :: non_neg_integer()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 64f078bd61..198e2782b4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -241,14 +241,16 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity). -deliver(QPid, #delivery{immediate = true, txn = Txn, message = Message}) -> - gen_server2:call(QPid, {deliver_immediately, Txn, Message}, +deliver(QPid, #delivery{immediate = true, + txn = Txn, sender = ChPid, message = Message}) -> + gen_server2:call(QPid, {deliver_immediately, Txn, Message, ChPid}, infinity); -deliver(QPid, #delivery{mandatory = true, txn = Txn, message = Message}) -> - gen_server2:call(QPid, {deliver, Txn, Message}, infinity), +deliver(QPid, #delivery{mandatory = true, + txn = Txn, sender = ChPid, message = Message}) -> + gen_server2:call(QPid, {deliver, Txn, Message, ChPid}, infinity), true; -deliver(QPid, #delivery{txn = Txn, message = Message}) -> - gen_server2:cast(QPid, {deliver, Txn, Message}), +deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> + gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}), true. redeliver(QPid, Messages) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c390b2b7e4..6027c9c04c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -66,6 +66,7 @@ monitor_ref, unacked_messages, is_limit_active, + txn, unsent_message_count}). -define(INFO_KEYS, @@ -133,6 +134,7 @@ ch_record(ChPid) -> monitor_ref = MonitorRef, unacked_messages = dict:new(), is_limit_active = false, + txn = none, unsent_message_count = 0}, put(Key, C), C; @@ -156,6 +158,11 @@ ch_record_state_transition(OldCR, NewCR) -> true -> ok end. +record_current_channel_tx(ChPid, Txn) -> + %% as a side effect this also starts monitoring the channel (if + %% that wasn't happening already) + store_ch_record((ch_record(ChPid))#cr{txn = Txn}). + deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, round_robin = RoundRobin, @@ -198,7 +205,7 @@ deliver_immediately(Message, Delivered, {not_offered, State} end. -attempt_delivery(none, Message, State) -> +attempt_delivery(none, _ChPid, Message, State) -> case deliver_immediately(Message, false, State) of {offered, false, State1} -> {true, State1}; @@ -209,13 +216,13 @@ attempt_delivery(none, Message, State) -> {not_offered, State1} -> {false, State1} end; -attempt_delivery(Txn, Message, State) -> +attempt_delivery(Txn, ChPid, Message, State) -> persist_message(Txn, qname(State), Message), - record_pending_message(Txn, Message), + record_pending_message(Txn, ChPid, Message), {true, State}. -deliver_or_enqueue(Txn, Message, State) -> - case attempt_delivery(Txn, Message, State) of +deliver_or_enqueue(Txn, ChPid, Message, State) -> + case attempt_delivery(Txn, ChPid, Message, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> @@ -295,10 +302,16 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, round_robin = ActiveConsumers}) -> case lookup_ch(DownPid) of not_found -> noreply(State); - #cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} -> + #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn, + unacked_messages = UAM} -> NewActive = block_consumers(ChPid, ActiveConsumers), erlang:demonitor(MonitorRef), erase({ch, ChPid}), + case Txn of + none -> ok; + _ -> ok = rollback_work(Txn, qname(State)), + erase_tx(Txn) + end, case check_auto_delete( deliver_or_enqueue_n( [{Message, true} || @@ -456,13 +469,17 @@ is_tx_persistent(Txn) -> #tx{is_persistent = Res} = lookup_tx(Txn), Res. -record_pending_message(Txn, Message) -> +record_pending_message(Txn, ChPid, Message) -> Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), - store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}). + record_current_channel_tx(ChPid, Txn), + store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending], + ch_pid = ChPid}). record_pending_acks(Txn, ChPid, MsgIds) -> Tx = #tx{pending_acks = Pending} = lookup_tx(Txn), - store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], ch_pid = ChPid}). + record_current_channel_tx(ChPid, Txn), + store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], + ch_pid = ChPid}). process_pending(Txn, State) -> #tx{ch_pid = ChPid, @@ -541,7 +558,7 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call({deliver_immediately, Txn, Message}, _From, State) -> +handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% Synchronous, "immediate" delivery mode %% %% FIXME: Is this correct semantics? @@ -555,12 +572,12 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_delivery(Txn, Message, State), + {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), reply(Delivered, NewState); -handle_call({deliver, Txn, Message}, _From, State) -> +handle_call({deliver, Txn, Message, ChPid}, _From, State) -> %% Synchronous, "mandatory" delivery mode - {Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), + {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), reply(Delivered, NewState); handle_call({commit, Txn}, From, State) -> @@ -711,9 +728,9 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, reply(locked, State) end. -handle_cast({deliver, Txn, Message}, State) -> +handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - {_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), + {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), noreply(NewState); handle_cast({ack, Txn, MsgIds, ChPid}, State) -> diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index ce4f818d0a..761b3863b4 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -61,7 +61,7 @@ publish(Delivery = #delivery{ delivery(Mandatory, Immediate, Txn, Message) -> #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, - message = Message}. + sender = self(), message = Message}. message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),