Merged default into 18557
This commit is contained in:
commit
157007943b
|
|
@ -319,6 +319,16 @@ handle_info(shutdown, State) ->
|
|||
NewState = channel_cleanup(State),
|
||||
{stop, normal, NewState};
|
||||
|
||||
%% Handle a trapped exit, e.g. from the direct peer
|
||||
%% In the direct case this is the local channel
|
||||
%% In the network case this is the process that writes to the socket
|
||||
%% on a per channel basis
|
||||
handle_info({'EXIT', Pid, Reason},
|
||||
State = #channel_state{number = Number}) ->
|
||||
io:format("Channel ~p is shutting down due to: ~p~n",[Number, Reason]),
|
||||
NewState = channel_cleanup(State),
|
||||
{stop, normal, NewState};
|
||||
|
||||
%---------------------------------------------------------------------------
|
||||
% This is for a race condition between a close.close_ok and a subsequent channel.open
|
||||
%---------------------------------------------------------------------------
|
||||
|
|
@ -348,3 +358,4 @@ terminate(Reason, State) ->
|
|||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
State.
|
||||
|
||||
|
|
|
|||
|
|
@ -212,14 +212,19 @@ handle_info( {'EXIT', Pid, {amqp,Reason,Msg,Context}}, State) ->
|
|||
io:format("Just trapping this exit and proceding to trap an exit from the client channel process~n"),
|
||||
{noreply, State};
|
||||
true ->
|
||||
io:format("A hard error has occurred, this forces the connection to end~n"),
|
||||
{stop,normal,State}
|
||||
io:format("Hard error: (Code = ~p, Text = ~p)~n", [Code, Text]),
|
||||
{stop, {hard_error, {Code, Text}}, State}
|
||||
end;
|
||||
|
||||
%% Just the amqp channel shutting down, so unregister this channel
|
||||
handle_info( {'EXIT', Pid, normal}, State) ->
|
||||
NewState = unregister_channel(Pid, State),
|
||||
{noreply, NewState};
|
||||
|
||||
% This is a special case for abruptly closed socket connections
|
||||
handle_info( {'EXIT', Pid, {socket_error, Reason}}, State) ->
|
||||
{stop, {socket_error, Reason}, State};
|
||||
|
||||
handle_info( {'EXIT', Pid, Reason}, State) ->
|
||||
io:format("Connection: Handling exit from ~p --> ~p~n",[Pid,Reason]),
|
||||
NewState = unregister_channel(Pid, State),
|
||||
|
|
|
|||
|
|
@ -171,7 +171,8 @@ reader_loop(Sock, Type, Channel, Length) ->
|
|||
{ok, Ref} = prim_inet:async_recv(Sock, PayloadSize + 1, -1),
|
||||
reader_loop(Sock, _Type, _Channel, PayloadSize);
|
||||
{inet_async, Sock, Ref, {error, Reason}} ->
|
||||
io:format("Have a look into this one: ~p~n",[Reason]);
|
||||
io:format("Socket error: ~p~n", [Reason]),
|
||||
exit({socket_error, Reason});
|
||||
{heartbeat, Heartbeat} ->
|
||||
rabbit_heartbeat:start_heartbeat(Sock, Heartbeat),
|
||||
reader_loop(Sock, Type, Channel, Length);
|
||||
|
|
@ -187,7 +188,8 @@ reader_loop(Sock, Type, Channel, Length) ->
|
|||
erase(H),
|
||||
reader_loop(Sock, Type, Channel, Length);
|
||||
Other ->
|
||||
io:format("Other ~p~n",[Other])
|
||||
io:format("Unknown message type: ~p~n", [Other]),
|
||||
exit({unknown_message_type, Other})
|
||||
end.
|
||||
|
||||
start_framing_channel(ChannelPid, ChannelNumber) ->
|
||||
|
|
|
|||
|
|
@ -51,13 +51,17 @@ close_channel(Channel) ->
|
|||
#'channel.close_ok'{} = amqp_channel:call(Channel, ChannelClose),
|
||||
ok.
|
||||
|
||||
teardown(Connection, Channel) ->
|
||||
close_channel(Channel),
|
||||
close_connection(Connection) ->
|
||||
ConnectionClose = #'connection.close'{reply_code = 200, reply_text = <<"Goodbye">>,
|
||||
class_id = 0, method_id = 0},
|
||||
#'connection.close_ok'{} = amqp_connection:close(Connection, ConnectionClose),
|
||||
ok.
|
||||
|
||||
teardown(Connection, Channel) ->
|
||||
close_channel(Channel),
|
||||
close_connection(Connection).
|
||||
|
||||
|
||||
get(Channel, Q) -> get(Channel, Q, true).
|
||||
|
||||
get(Channel, Q, NoAck) ->
|
||||
|
|
|
|||
|
|
@ -43,4 +43,5 @@ non_existent_exchange_test(Connection) ->
|
|||
end,
|
||||
?assertNot(is_process_alive(Channel)),
|
||||
{Pid,_} = Connection,
|
||||
?assert(is_process_alive(Pid)).
|
||||
?assert(is_process_alive(Pid)),
|
||||
lib_amqp:close_connection(Connection).
|
||||
|
|
@ -35,7 +35,13 @@
|
|||
-record(publish,{q, x, routing_key, bind_key, payload,
|
||||
mandatory = false, immediate = false}).
|
||||
|
||||
% The latch constant defines how many processes are spawned in order
|
||||
% to run certain functionality in parallel. It follows the standard
|
||||
% countdown latch pattern.
|
||||
-define(Latch, 100).
|
||||
|
||||
% The wait constant defines how long a consumer waits before it
|
||||
% unsubscribes
|
||||
-define(Wait, 200).
|
||||
|
||||
%%%%
|
||||
|
|
@ -168,7 +174,8 @@ basic_return_test(Connection) ->
|
|||
io:format(">>>Rec'd ~p/~p~n",[WhatsThis])
|
||||
after 2000 ->
|
||||
exit(no_return_received)
|
||||
end.
|
||||
end,
|
||||
lib_amqp:teardown(Connection, Channel).
|
||||
|
||||
basic_ack_test(Connection) ->
|
||||
Channel = lib_amqp:start_channel(Connection),
|
||||
|
|
@ -265,7 +272,8 @@ producer_loop(Channel, RoutingKey, N) ->
|
|||
producer_loop(Channel, RoutingKey, N - 1).
|
||||
|
||||
% Reject is not yet implemented in RabbitMQ
|
||||
basic_reject_test(Connection) -> ok.
|
||||
basic_reject_test(Connection) ->
|
||||
lib_amqp:close_connection(Connection).
|
||||
|
||||
setup_publish(Channel) ->
|
||||
Publish = #publish{routing_key = <<"a.b.c.d">>,
|
||||
|
|
|
|||
Loading…
Reference in New Issue