merge bug19432 into default

This commit is contained in:
Matthias Radestock 2008-10-15 12:19:39 +01:00
commit 7d47d3c66f
4 changed files with 31 additions and 11 deletions

View File

@ -130,16 +130,18 @@ rpc_bottom_half(#'channel.close'{reply_code = ReplyCode,
rpc_bottom_half(Reply, State = #channel_state{writer_pid = Writer,
rpc_requests = RequestQueue,
do2 = Do2}) ->
{{value, {From,_}}, NewRequestQueue} = queue:out(RequestQueue),
gen_server:reply(From, Reply),
catch case queue:head(NewRequestQueue) of
empty ->
ok;
{NewFrom,Method} ->
Do2(Writer,Method)
NewRequestQueue =
case queue:out(RequestQueue) of
{empty, {[],[]}} -> exit(empty_rpc_bottom_half);
{{value, {From, _}}, Q} -> gen_server:reply(From, Reply),
Q
end,
case queue:is_empty(NewRequestQueue) of
true -> ok;
false -> {_NewFrom, Method} = queue:head(NewRequestQueue),
Do2(Writer, Method)
end,
NewState = State#channel_state{rpc_requests = NewRequestQueue},
{noreply, NewState}.
{noreply, State#channel_state{rpc_requests = NewRequestQueue}}.
subscription_top_half(Method, From, State = #channel_state{writer_pid = Writer, do2 = Do2}) ->
Do2(Writer,Method),

View File

@ -46,6 +46,7 @@ lifecycle_test() -> test_util:lifecycle_test(new_connection()).
basic_ack_test() ->test_util:basic_ack_test(new_connection()).
command_serialization_test() -> test_util:command_serialization_test(new_connection()).
%----------------------------------------------------------------------------
% Negative Tests

View File

@ -53,6 +53,9 @@ basic_ack_test() ->
channel_lifecycle_test() ->
test_util:channel_lifecycle_test(new_connection()).
command_serialization_test() ->
test_util:command_serialization_test(new_connection()).
%----------------------------------------------------------------------------
% Negative Tests

View File

@ -35,7 +35,7 @@
-record(publish,{q, x, routing_key, bind_key, payload,
mandatory = false, immediate = false}).
-define(Latch, 1).
-define(Latch, 100).
-define(Wait, 200).
%%%%
@ -85,6 +85,20 @@ channel_lifecycle_test(Connection) ->
lib_amqp:teardown(Connection, Channel2),
ok.
% This is designed to exercize the internal queuing mechanism
% to ensure that commands are properly serialized
command_serialization_test(Connection) ->
Channel = lib_amqp:start_channel(Connection),
Parent = self(),
[spawn(fun() ->
Q = uuid(),
Q1 = lib_amqp:declare_queue(Channel, Q),
?assertMatch(Q, Q1),
Parent ! finished
end) || Tag <- lists:seq(1,?Latch)],
latch_loop(?Latch),
lib_amqp:teardown(Connection, Channel).
basic_get_test(Connection) ->
Channel = lib_amqp:start_channel(Connection),
{ok, Q} = setup_publish(Channel),