Synchronous RPC requests are now queued up and dispatched serially
This commit is contained in:
		
							parent
							
								
									95ea5768f0
								
							
						
					
					
						commit
						00f52e9b86
					
				| 
						 | 
				
			
			@ -42,7 +42,7 @@
 | 
			
		|||
                        reader_pid,
 | 
			
		||||
                        writer_pid,
 | 
			
		||||
                        do2, do3,
 | 
			
		||||
                        pending_rpc,
 | 
			
		||||
                        rpc_requests = queue:new(),
 | 
			
		||||
                        pending_consumer,
 | 
			
		||||
                        closing = false,
 | 
			
		||||
                        return_handler_pid,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -106,22 +106,18 @@ register_return_handler(Channel, ReturnHandler) ->
 | 
			
		|||
%---------------------------------------------------------------------------
 | 
			
		||||
 | 
			
		||||
rpc_top_half(Method, From, State = #channel_state{writer_pid = Writer,
 | 
			
		||||
                                                  pending_rpc = PendingRpc,
 | 
			
		||||
                                                  do2 = Do2}) ->
 | 
			
		||||
	% Check whether there is an outstanding RPC request
 | 
			
		||||
	case PendingRpc of
 | 
			
		||||
		{Pid,Ref} ->
 | 
			
		||||
            exit(illegal_pending_rpc);
 | 
			
		||||
		Other ->
 | 
			
		||||
			ok
 | 
			
		||||
	end,		
 | 
			
		||||
    NewState = State#channel_state{pending_rpc = From},
 | 
			
		||||
                                                  rpc_requests = RequestQueue,
 | 
			
		||||
                                                  do2 = Do2}) ->	
 | 
			
		||||
	% Enqueue the incoming RPC request to serialize RPC dispatching
 | 
			
		||||
	NewRequestQueue = queue:in(From, RequestQueue),
 | 
			
		||||
    NewState = State#channel_state{rpc_requests = NewRequestQueue},
 | 
			
		||||
    Do2(Writer,Method),
 | 
			
		||||
    {noreply, NewState}.
 | 
			
		||||
 | 
			
		||||
rpc_bottom_half(Reply, State = #channel_state{pending_rpc = From}) ->
 | 
			
		||||
rpc_bottom_half(Reply, State = #channel_state{rpc_requests = RequestQueue}) ->
 | 
			
		||||
	{{value, From}, NewRequestQueue} = queue:out(RequestQueue),
 | 
			
		||||
    gen_server:reply(From, Reply),
 | 
			
		||||
    NewState = State#channel_state{pending_rpc = <<>>},
 | 
			
		||||
    NewState = State#channel_state{rpc_requests = NewRequestQueue},
 | 
			
		||||
    {noreply, NewState}.
 | 
			
		||||
 | 
			
		||||
resolve_consumer(ConsumerTag, #channel_state{consumers = []}) ->
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -35,6 +35,8 @@
 | 
			
		|||
-record(publish,{q, x, routing_key, bind_key, payload,
 | 
			
		||||
                 mandatory = false, immediate = false}).
 | 
			
		||||
 | 
			
		||||
-define(Latch, 100).
 | 
			
		||||
 | 
			
		||||
%%%%
 | 
			
		||||
%
 | 
			
		||||
% This is an example of how the client interaction should work
 | 
			
		||||
| 
						 | 
				
			
			@ -48,17 +50,35 @@
 | 
			
		|||
%   amqp_channel:call(Channel, ChannelClose),
 | 
			
		||||
%   ConnectionClose = #'connection.close'{ %% set the appropriate fields },
 | 
			
		||||
%   amqp_connection:close(Connection, ConnectionClose).
 | 
			
		||||
%
 | 
			
		||||
%	
 | 
			
		||||
 | 
			
		||||
lifecycle_test(Connection) ->
 | 
			
		||||
    Realm = <<"/data">>,
 | 
			
		||||
    Q = <<"a.b.c">>,
 | 
			
		||||
    X = <<"x">>,
 | 
			
		||||
	Realm = <<"/data">>,
 | 
			
		||||
	X = <<"x">>,
 | 
			
		||||
    {Channel, Ticket} = setup_channel(Connection, Realm),
 | 
			
		||||
	ExchangeDeclare = #'exchange.declare'{ticket = Ticket, exchange = X, type = <<"topic">>,
 | 
			
		||||
                                          passive = false, durable = false, auto_delete = false, internal = false,
 | 
			
		||||
                                          nowait = false, arguments = []},
 | 
			
		||||
    #'exchange.declare_ok'{} = amqp_channel:call(Channel, ExchangeDeclare),
 | 
			
		||||
	Parent = self(),
 | 
			
		||||
    [spawn(fun() -> queue_exchange_binding(Channel,Ticket,X,Parent,Tag) end) || Tag <- lists:seq(1,?Latch)],
 | 
			
		||||
	latch_loop(?Latch),
 | 
			
		||||
	ExchangeDelete = #'exchange.delete'{ticket = Ticket, exchange = X,
 | 
			
		||||
                                        if_unused = false, nowait = false},
 | 
			
		||||
    #'exchange.delete_ok'{} = amqp_channel:call(Channel, ExchangeDelete),
 | 
			
		||||
    teardown(Connection, Channel).
 | 
			
		||||
 | 
			
		||||
queue_exchange_binding(Channel,Ticket,X,Parent,Tag) ->
 | 
			
		||||
	receive
 | 
			
		||||
		nothing -> ok
 | 
			
		||||
	after (?Latch - Tag rem 7) * 10 ->
 | 
			
		||||
		ok
 | 
			
		||||
	end,
 | 
			
		||||
    Q = <<"a.b.c",Tag:32>>,
 | 
			
		||||
    BindKey = <<"a.b.c.*">>,
 | 
			
		||||
    RoutingKey = <<"a.b.c.d">>,
 | 
			
		||||
    Payload = <<"foobar">>,
 | 
			
		||||
    {Channel, Ticket} = setup_channel(Connection, Realm),
 | 
			
		||||
    QueueDeclare = #'queue.declare'{ticket = Ticket, queue = Q,
 | 
			
		||||
	QueueDeclare = #'queue.declare'{ticket = Ticket, queue = Q,
 | 
			
		||||
                                    passive = false, durable = false,
 | 
			
		||||
                                    exclusive = false, auto_delete = false,
 | 
			
		||||
                                    nowait = false, arguments = []},
 | 
			
		||||
| 
						 | 
				
			
			@ -67,10 +87,6 @@ lifecycle_test(Connection) ->
 | 
			
		|||
                        consumer_count = ConsumerCount}
 | 
			
		||||
                       = amqp_channel:call(Channel,QueueDeclare),
 | 
			
		||||
    ?assertMatch(Q, Q1),
 | 
			
		||||
    ExchangeDeclare = #'exchange.declare'{ticket = Ticket, exchange = X, type = <<"topic">>,
 | 
			
		||||
                                          passive = false, durable = false, auto_delete = false, internal = false,
 | 
			
		||||
                                          nowait = false, arguments = []},
 | 
			
		||||
    #'exchange.declare_ok'{} = amqp_channel:call(Channel, ExchangeDeclare),
 | 
			
		||||
    QueueBind = #'queue.bind'{ticket = Ticket, queue = Q, exchange = X,
 | 
			
		||||
                              routing_key = BindKey, nowait = false, arguments = []},
 | 
			
		||||
    #'queue.bind_ok'{} = amqp_channel:call(Channel, QueueBind),
 | 
			
		||||
| 
						 | 
				
			
			@ -78,10 +94,7 @@ lifecycle_test(Connection) ->
 | 
			
		|||
                                  if_unused = true, if_empty = true, nowait = false},
 | 
			
		||||
    #'queue.delete_ok'{message_count = MessageCount2} = amqp_channel:call(Channel, QueueDelete),
 | 
			
		||||
    ?assertMatch(MessageCount, MessageCount2),
 | 
			
		||||
    ExchangeDelete = #'exchange.delete'{ticket = Ticket, exchange = X,
 | 
			
		||||
                                        if_unused = false, nowait = false},
 | 
			
		||||
    #'exchange.delete_ok'{} = amqp_channel:call(Channel, ExchangeDelete),
 | 
			
		||||
    teardown(Connection, Channel).
 | 
			
		||||
	Parent ! finished.
 | 
			
		||||
 | 
			
		||||
channel_lifecycle_test(Connection) ->
 | 
			
		||||
    Realm = <<"/data">>,
 | 
			
		||||
| 
						 | 
				
			
			@ -309,3 +322,13 @@ setup_channel(Connection, Realm) ->
 | 
			
		|||
                               read = true},
 | 
			
		||||
    #'access.request_ok'{ticket = Ticket} = amqp_channel:call(Channel, Access),
 | 
			
		||||
    {Channel, Ticket}.
 | 
			
		||||
 | 
			
		||||
latch_loop(0) -> ok;             
 | 
			
		||||
latch_loop(Latch) ->             
 | 
			
		||||
	receive                          
 | 
			
		||||
		finished ->                  
 | 
			
		||||
		latch_loop(Latch - 1)    
 | 
			
		||||
	after ?Latch * 200 ->            
 | 
			
		||||
		exit(waited_too_long)	        
 | 
			
		||||
	end.		                     
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue