diff --git a/deps/amqp_client/include/amqp_client.hrl b/deps/amqp_client/include/amqp_client.hrl index a569ddeace..5e468e3c3d 100644 --- a/deps/amqp_client/include/amqp_client.hrl +++ b/deps/amqp_client/include/amqp_client.hrl @@ -42,7 +42,7 @@ reader_pid, writer_pid, do2, do3, - pending_rpc, + rpc_requests = queue:new(), pending_consumer, closing = false, return_handler_pid, diff --git a/deps/amqp_client/src/amqp_channel.erl b/deps/amqp_client/src/amqp_channel.erl index ff62321c4b..fe89476759 100644 --- a/deps/amqp_client/src/amqp_channel.erl +++ b/deps/amqp_client/src/amqp_channel.erl @@ -106,22 +106,18 @@ register_return_handler(Channel, ReturnHandler) -> %--------------------------------------------------------------------------- rpc_top_half(Method, From, State = #channel_state{writer_pid = Writer, - pending_rpc = PendingRpc, - do2 = Do2}) -> - % Check whether there is an outstanding RPC request - case PendingRpc of - {Pid,Ref} -> - exit(illegal_pending_rpc); - Other -> - ok - end, - NewState = State#channel_state{pending_rpc = From}, + rpc_requests = RequestQueue, + do2 = Do2}) -> + % Enqueue the incoming RPC request to serialize RPC dispatching + NewRequestQueue = queue:in(From, RequestQueue), + NewState = State#channel_state{rpc_requests = NewRequestQueue}, Do2(Writer,Method), {noreply, NewState}. -rpc_bottom_half(Reply, State = #channel_state{pending_rpc = From}) -> +rpc_bottom_half(Reply, State = #channel_state{rpc_requests = RequestQueue}) -> + {{value, From}, NewRequestQueue} = queue:out(RequestQueue), gen_server:reply(From, Reply), - NewState = State#channel_state{pending_rpc = <<>>}, + NewState = State#channel_state{rpc_requests = NewRequestQueue}, {noreply, NewState}. resolve_consumer(ConsumerTag, #channel_state{consumers = []}) -> diff --git a/deps/amqp_client/src/test_util.erl b/deps/amqp_client/src/test_util.erl index 08e73858ee..8ab32df587 100644 --- a/deps/amqp_client/src/test_util.erl +++ b/deps/amqp_client/src/test_util.erl @@ -35,6 +35,8 @@ -record(publish,{q, x, routing_key, bind_key, payload, mandatory = false, immediate = false}). +-define(Latch, 100). + %%%% % % This is an example of how the client interaction should work @@ -48,17 +50,35 @@ % amqp_channel:call(Channel, ChannelClose), % ConnectionClose = #'connection.close'{ %% set the appropriate fields }, % amqp_connection:close(Connection, ConnectionClose). -% +% lifecycle_test(Connection) -> - Realm = <<"/data">>, - Q = <<"a.b.c">>, - X = <<"x">>, + Realm = <<"/data">>, + X = <<"x">>, + {Channel, Ticket} = setup_channel(Connection, Realm), + ExchangeDeclare = #'exchange.declare'{ticket = Ticket, exchange = X, type = <<"topic">>, + passive = false, durable = false, auto_delete = false, internal = false, + nowait = false, arguments = []}, + #'exchange.declare_ok'{} = amqp_channel:call(Channel, ExchangeDeclare), + Parent = self(), + [spawn(fun() -> queue_exchange_binding(Channel,Ticket,X,Parent,Tag) end) || Tag <- lists:seq(1,?Latch)], + latch_loop(?Latch), + ExchangeDelete = #'exchange.delete'{ticket = Ticket, exchange = X, + if_unused = false, nowait = false}, + #'exchange.delete_ok'{} = amqp_channel:call(Channel, ExchangeDelete), + teardown(Connection, Channel). + +queue_exchange_binding(Channel,Ticket,X,Parent,Tag) -> + receive + nothing -> ok + after (?Latch - Tag rem 7) * 10 -> + ok + end, + Q = <<"a.b.c",Tag:32>>, BindKey = <<"a.b.c.*">>, RoutingKey = <<"a.b.c.d">>, Payload = <<"foobar">>, - {Channel, Ticket} = setup_channel(Connection, Realm), - QueueDeclare = #'queue.declare'{ticket = Ticket, queue = Q, + QueueDeclare = #'queue.declare'{ticket = Ticket, queue = Q, passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = []}, @@ -67,10 +87,6 @@ lifecycle_test(Connection) -> consumer_count = ConsumerCount} = amqp_channel:call(Channel,QueueDeclare), ?assertMatch(Q, Q1), - ExchangeDeclare = #'exchange.declare'{ticket = Ticket, exchange = X, type = <<"topic">>, - passive = false, durable = false, auto_delete = false, internal = false, - nowait = false, arguments = []}, - #'exchange.declare_ok'{} = amqp_channel:call(Channel, ExchangeDeclare), QueueBind = #'queue.bind'{ticket = Ticket, queue = Q, exchange = X, routing_key = BindKey, nowait = false, arguments = []}, #'queue.bind_ok'{} = amqp_channel:call(Channel, QueueBind), @@ -78,10 +94,7 @@ lifecycle_test(Connection) -> if_unused = true, if_empty = true, nowait = false}, #'queue.delete_ok'{message_count = MessageCount2} = amqp_channel:call(Channel, QueueDelete), ?assertMatch(MessageCount, MessageCount2), - ExchangeDelete = #'exchange.delete'{ticket = Ticket, exchange = X, - if_unused = false, nowait = false}, - #'exchange.delete_ok'{} = amqp_channel:call(Channel, ExchangeDelete), - teardown(Connection, Channel). + Parent ! finished. channel_lifecycle_test(Connection) -> Realm = <<"/data">>, @@ -309,3 +322,13 @@ setup_channel(Connection, Realm) -> read = true}, #'access.request_ok'{ticket = Ticket} = amqp_channel:call(Channel, Access), {Channel, Ticket}. + +latch_loop(0) -> ok; +latch_loop(Latch) -> + receive + finished -> + latch_loop(Latch - 1) + after ?Latch * 200 -> + exit(waited_too_long) + end. +