clean up tx records in queues when a transaction's channel dies
Previously queues were only monitoring channels with subscribers or to which ack-requiring messages had been delivered. Now queues also monitor channels from which they have received transactional publishes. Queues record the last tx id they have seen from a channel. This then makes it easy and efficient to find the associated tx record in the queue's process dictionary when a channel process dies - the alternative would be to scan the tx records for matching channel pids - and perform the required rollback activities for the tx.
This commit is contained in:
		
							parent
							
								
									eb638c17b4
								
							
						
					
					
						commit
						1b43c217f4
					
				| 
						 | 
				
			
			@ -64,7 +64,7 @@
 | 
			
		|||
 | 
			
		||||
-record(basic_message, {exchange_name, routing_key, content, persistent_key}).
 | 
			
		||||
 | 
			
		||||
-record(delivery, {mandatory, immediate, txn, message}).
 | 
			
		||||
-record(delivery, {mandatory, immediate, txn, sender, message}).
 | 
			
		||||
 | 
			
		||||
%%----------------------------------------------------------------------------
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -140,6 +140,7 @@
 | 
			
		|||
      #delivery{mandatory :: bool(),
 | 
			
		||||
                immediate :: bool(),
 | 
			
		||||
                txn       :: maybe(txn()),
 | 
			
		||||
                sender    :: pid(),
 | 
			
		||||
                message   :: message()}).
 | 
			
		||||
%% this really should be an abstract type
 | 
			
		||||
-type(msg_id() :: non_neg_integer()).
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -241,14 +241,16 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
 | 
			
		|||
 | 
			
		||||
purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity).
 | 
			
		||||
 | 
			
		||||
deliver(QPid, #delivery{immediate = true, txn = Txn, message = Message}) ->
 | 
			
		||||
    gen_server2:call(QPid, {deliver_immediately, Txn, Message},
 | 
			
		||||
deliver(QPid, #delivery{immediate = true,
 | 
			
		||||
                        txn = Txn, sender = ChPid, message = Message}) ->
 | 
			
		||||
    gen_server2:call(QPid, {deliver_immediately, Txn, Message, ChPid},
 | 
			
		||||
                     infinity);
 | 
			
		||||
deliver(QPid, #delivery{mandatory = true, txn = Txn, message = Message}) ->
 | 
			
		||||
    gen_server2:call(QPid, {deliver, Txn, Message}, infinity),
 | 
			
		||||
deliver(QPid, #delivery{mandatory = true,
 | 
			
		||||
                        txn = Txn, sender = ChPid, message = Message}) ->
 | 
			
		||||
    gen_server2:call(QPid, {deliver, Txn, Message, ChPid}, infinity),
 | 
			
		||||
    true;
 | 
			
		||||
deliver(QPid, #delivery{txn = Txn, message = Message}) ->
 | 
			
		||||
    gen_server2:cast(QPid, {deliver, Txn, Message}),
 | 
			
		||||
deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
 | 
			
		||||
    gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}),
 | 
			
		||||
    true.
 | 
			
		||||
 | 
			
		||||
redeliver(QPid, Messages) ->
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -66,6 +66,7 @@
 | 
			
		|||
             monitor_ref,
 | 
			
		||||
             unacked_messages,
 | 
			
		||||
             is_limit_active,
 | 
			
		||||
             txn,
 | 
			
		||||
             unsent_message_count}).
 | 
			
		||||
 | 
			
		||||
-define(INFO_KEYS,
 | 
			
		||||
| 
						 | 
				
			
			@ -133,6 +134,7 @@ ch_record(ChPid) ->
 | 
			
		|||
                    monitor_ref = MonitorRef,
 | 
			
		||||
                    unacked_messages = dict:new(),
 | 
			
		||||
                    is_limit_active = false,
 | 
			
		||||
                    txn = none,
 | 
			
		||||
                    unsent_message_count = 0},
 | 
			
		||||
            put(Key, C),
 | 
			
		||||
            C;
 | 
			
		||||
| 
						 | 
				
			
			@ -156,6 +158,11 @@ ch_record_state_transition(OldCR, NewCR) ->
 | 
			
		|||
       true                               -> ok
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
record_current_channel_tx(ChPid, Txn) ->
 | 
			
		||||
    %% as a side effect this also starts monitoring the channel (if
 | 
			
		||||
    %% that wasn't happening already)
 | 
			
		||||
    store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
 | 
			
		||||
    
 | 
			
		||||
deliver_immediately(Message, Delivered,
 | 
			
		||||
                    State = #q{q = #amqqueue{name = QName},
 | 
			
		||||
                               round_robin = RoundRobin,
 | 
			
		||||
| 
						 | 
				
			
			@ -198,7 +205,7 @@ deliver_immediately(Message, Delivered,
 | 
			
		|||
            {not_offered, State}
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
attempt_delivery(none, Message, State) ->
 | 
			
		||||
attempt_delivery(none, _ChPid, Message, State) ->
 | 
			
		||||
    case deliver_immediately(Message, false, State) of
 | 
			
		||||
        {offered, false, State1} ->
 | 
			
		||||
            {true, State1};
 | 
			
		||||
| 
						 | 
				
			
			@ -209,13 +216,13 @@ attempt_delivery(none, Message, State) ->
 | 
			
		|||
        {not_offered, State1} ->
 | 
			
		||||
            {false, State1}
 | 
			
		||||
    end;
 | 
			
		||||
attempt_delivery(Txn, Message, State) ->
 | 
			
		||||
attempt_delivery(Txn, ChPid, Message, State) ->
 | 
			
		||||
    persist_message(Txn, qname(State), Message),
 | 
			
		||||
    record_pending_message(Txn, Message),
 | 
			
		||||
    record_pending_message(Txn, ChPid, Message),
 | 
			
		||||
    {true, State}.
 | 
			
		||||
 | 
			
		||||
deliver_or_enqueue(Txn, Message, State) ->
 | 
			
		||||
    case attempt_delivery(Txn, Message, State) of
 | 
			
		||||
deliver_or_enqueue(Txn, ChPid, Message, State) ->
 | 
			
		||||
    case attempt_delivery(Txn, ChPid, Message, State) of
 | 
			
		||||
        {true, NewState} ->
 | 
			
		||||
            {true, NewState};
 | 
			
		||||
        {false, NewState} ->
 | 
			
		||||
| 
						 | 
				
			
			@ -295,10 +302,16 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
 | 
			
		|||
                                   round_robin = ActiveConsumers}) ->
 | 
			
		||||
    case lookup_ch(DownPid) of
 | 
			
		||||
        not_found -> noreply(State);
 | 
			
		||||
        #cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} ->
 | 
			
		||||
        #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn,
 | 
			
		||||
            unacked_messages = UAM} ->
 | 
			
		||||
            NewActive = block_consumers(ChPid, ActiveConsumers),
 | 
			
		||||
            erlang:demonitor(MonitorRef),
 | 
			
		||||
            erase({ch, ChPid}),
 | 
			
		||||
            case Txn of
 | 
			
		||||
                none -> ok;
 | 
			
		||||
                _    -> ok = rollback_work(Txn, qname(State)),
 | 
			
		||||
                        erase_tx(Txn)
 | 
			
		||||
            end,
 | 
			
		||||
            case check_auto_delete(
 | 
			
		||||
                   deliver_or_enqueue_n(
 | 
			
		||||
                     [{Message, true} ||
 | 
			
		||||
| 
						 | 
				
			
			@ -456,13 +469,17 @@ is_tx_persistent(Txn) ->
 | 
			
		|||
    #tx{is_persistent = Res} = lookup_tx(Txn),
 | 
			
		||||
    Res.
 | 
			
		||||
 | 
			
		||||
record_pending_message(Txn, Message) ->
 | 
			
		||||
record_pending_message(Txn, ChPid, Message) ->
 | 
			
		||||
    Tx = #tx{pending_messages = Pending} = lookup_tx(Txn),
 | 
			
		||||
    store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}).
 | 
			
		||||
    record_current_channel_tx(ChPid, Txn),
 | 
			
		||||
    store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending],
 | 
			
		||||
                        ch_pid = ChPid}).
 | 
			
		||||
 | 
			
		||||
record_pending_acks(Txn, ChPid, MsgIds) ->
 | 
			
		||||
    Tx = #tx{pending_acks = Pending} = lookup_tx(Txn),
 | 
			
		||||
    store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], ch_pid = ChPid}).
 | 
			
		||||
    record_current_channel_tx(ChPid, Txn),
 | 
			
		||||
    store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending],
 | 
			
		||||
                        ch_pid = ChPid}).
 | 
			
		||||
 | 
			
		||||
process_pending(Txn, State) ->
 | 
			
		||||
    #tx{ch_pid = ChPid,
 | 
			
		||||
| 
						 | 
				
			
			@ -541,7 +558,7 @@ handle_call({info, Items}, _From, State) ->
 | 
			
		|||
    catch Error -> reply({error, Error}, State)
 | 
			
		||||
    end;
 | 
			
		||||
 | 
			
		||||
handle_call({deliver_immediately, Txn, Message}, _From, State) ->
 | 
			
		||||
handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
 | 
			
		||||
    %% Synchronous, "immediate" delivery mode
 | 
			
		||||
    %%
 | 
			
		||||
    %% FIXME: Is this correct semantics?
 | 
			
		||||
| 
						 | 
				
			
			@ -555,12 +572,12 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) ->
 | 
			
		|||
    %% just all ready-to-consume queues get the message, with unready
 | 
			
		||||
    %% queues discarding the message?
 | 
			
		||||
    %%
 | 
			
		||||
    {Delivered, NewState} = attempt_delivery(Txn, Message, State),
 | 
			
		||||
    {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State),
 | 
			
		||||
    reply(Delivered, NewState);
 | 
			
		||||
 | 
			
		||||
handle_call({deliver, Txn, Message}, _From, State) ->
 | 
			
		||||
handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
 | 
			
		||||
    %% Synchronous, "mandatory" delivery mode
 | 
			
		||||
    {Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
 | 
			
		||||
    {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
 | 
			
		||||
    reply(Delivered, NewState);
 | 
			
		||||
 | 
			
		||||
handle_call({commit, Txn}, From, State) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -711,9 +728,9 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
 | 
			
		|||
            reply(locked, State)
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
handle_cast({deliver, Txn, Message}, State) ->
 | 
			
		||||
handle_cast({deliver, Txn, Message, ChPid}, State) ->
 | 
			
		||||
    %% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
 | 
			
		||||
    {_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
 | 
			
		||||
    {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
 | 
			
		||||
    noreply(NewState);
 | 
			
		||||
 | 
			
		||||
handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -61,7 +61,7 @@ publish(Delivery = #delivery{
 | 
			
		|||
 | 
			
		||||
delivery(Mandatory, Immediate, Txn, Message) ->
 | 
			
		||||
    #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
 | 
			
		||||
              message = Message}.
 | 
			
		||||
              sender = self(), message = Message}.
 | 
			
		||||
 | 
			
		||||
message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) ->
 | 
			
		||||
    {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue