merge bug19625 into default

This commit is contained in:
Matthias Radestock 2009-01-07 17:17:57 +00:00
commit ea798a505b
5 changed files with 44 additions and 9 deletions

View File

@ -33,6 +33,7 @@
direct,
channel_max,
heartbeat,
on_close_handler,
channels = dict:new() }).
-record(channel_state, {number,

View File

@ -45,9 +45,11 @@
start(User,Password) -> start(User,Password,false).
start(User,Password,ProcLink) when is_boolean(ProcLink) ->
Handshake = fun amqp_direct_driver:handshake/1,
BrokerCloseHandler = fun amqp_direct_driver:handle_broker_close/1,
InitialState = #connection_state{username = User,
password = Password,
vhostpath = <<"/">>},
vhostpath = <<"/">>,
on_close_handler = BrokerCloseHandler},
{ok, Pid} = start_internal(InitialState, Handshake,ProcLink),
{Pid, direct};
@ -56,13 +58,15 @@ start(User,Password,Host) -> start(User,Password,Host,<<"/">>,false).
start(User,Password,Host,VHost) -> start(User,Password,Host,VHost,false).
start(User,Password,Host,VHost,ProcLink) ->
Handshake = fun amqp_network_driver:handshake/1,
BrokerCloseHandler = fun amqp_network_driver:handle_broker_close/1,
InitialState = #connection_state{username = User,
password = Password,
serverhost = Host,
vhostpath = VHost},
vhostpath = VHost,
on_close_handler = BrokerCloseHandler},
{ok, Pid} = start_internal(InitialState, Handshake,ProcLink),
{Pid, network}.
start_link(User,Password) -> start(User,Password,true).
start_link(User,Password,Host) -> start(User,Password,Host,<<"/">>,true).
start_link(User,Password,Host,VHost) -> start(User,Password,Host,VHost,true).
@ -200,6 +204,18 @@ handle_call({Mode, Close = #'connection.close'{}}, From, State) ->
handle_cast(_Message, State) ->
{noreply, State}.
%---------------------------------------------------------------------------
% Handle forced close from the broker
%---------------------------------------------------------------------------
handle_info({method, #'connection.close'{reply_code = Code,
reply_text = Text},
_Content},
State = #connection_state{on_close_handler = OnCloseHandler}) ->
io:format("Broker forced connection: ~p -> ~p~n", [Code, Text]),
OnCloseHandler(State),
{stop, normal, State};
%---------------------------------------------------------------------------
% Trap exits
%---------------------------------------------------------------------------

View File

@ -31,6 +31,7 @@
-export([handshake/1, open_channel/3, close_channel/1, close_connection/3]).
-export([do/2,do/3]).
-export([handle_broker_close/1]).
%---------------------------------------------------------------------------
% Driver API Methods
@ -60,3 +61,5 @@ close_connection(_Close, From, _State) ->
do(Writer, Method) -> rabbit_channel:do(Writer, Method).
do(Writer, Method, Content) -> rabbit_channel:do(Writer, Method, Content).
handle_broker_close(_State) -> ok.

View File

@ -32,6 +32,9 @@
-export([handshake/1, open_channel/3, close_channel/1, close_connection/3]).
-export([start_reader/2, start_writer/2]).
-export([do/2,do/3]).
-export([handle_broker_close/1]).
-define(SOCKET_CLOSING_TIMEOUT, 1000).
%---------------------------------------------------------------------------
% Driver API Methods
@ -75,7 +78,7 @@ close_channel(WriterPid) ->
%% This closes the writer down, waits for the confirmation from the
%% the channel and then returns the ack to the user
close_connection(Close = #'connection.close'{}, From,
#connection_state{channel0_writer_pid = Writer, reader_pid = Reader}) ->
#connection_state{channel0_writer_pid = Writer}) ->
rabbit_writer:send_command(Writer, Close),
rabbit_writer:shutdown(Writer),
receive
@ -84,12 +87,18 @@ close_connection(Close = #'connection.close'{}, From,
after
5000 ->
exit(timeout_on_exit)
end,
Reader ! close.
end.
do(Writer, Method) -> rabbit_writer:send_command(Writer, Method).
do(Writer, Method, Content) -> rabbit_writer:send_command(Writer, Method, Content).
handle_broker_close(#connection_state{channel0_writer_pid = Writer,
reader_pid = Reader}) ->
CloseOk = #'connection.close_ok'{},
rabbit_writer:send_command(Writer, CloseOk),
rabbit_writer:shutdown(Writer),
erlang:send_after(?SOCKET_CLOSING_TIMEOUT, Reader, close).
%---------------------------------------------------------------------------
% AMQP message sending and receiving
%---------------------------------------------------------------------------
@ -166,6 +175,8 @@ reader_loop(Sock, Type, Channel, Length) ->
{inet_async, Sock, _, {ok, <<_Type:8,_Channel:16,PayloadSize:32>>}} ->
{ok, _Ref} = prim_inet:async_recv(Sock, PayloadSize + 1, -1),
reader_loop(Sock, _Type, _Channel, PayloadSize);
{inet_async, Sock, _Ref, {error, closed}} ->
ok;
{inet_async, Sock, _Ref, {error, Reason}} ->
io:format("Socket error: ~p~n", [Reason]),
exit({socket_error, Reason});
@ -196,14 +207,15 @@ handle_frame(Type, Channel, Payload) ->
case rabbit_reader:analyze_frame(Type, Payload) of
heartbeat when Channel /= 0 ->
rabbit_misc:die(frame_error);
heartbeat ->
heartbeat;
trace when Channel /= 0 ->
rabbit_misc:die(frame_error);
%% Match heartbeats and trace frames, but don't do anything with them
heartbeat ->
heartbeat;
trace ->
trace;
{method,'connection.close_ok',Content} ->
send_frame(Channel, {method,'connection.close_ok',Content}),
send_frame(Channel, {method, 'connection.close_ok', Content}),
closed_ok;
AnalyzedFrame ->
send_frame(Channel, AnalyzedFrame)

View File

@ -59,6 +59,9 @@ queue_unbind_test() ->
command_serialization_test() ->
test_util:command_serialization_test(new_connection()).
teardown_test() ->
test_util:teardown_test(new_connection()).
rpc_test() ->
test_util:rpc_test(new_connection()).