From 118c962486ba31ba674c53e66be9a728b82fbc90 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Tue, 6 Jan 2009 18:07:55 +0000 Subject: [PATCH] If the client initiates the connection close, it will close the socket rather than waiting for the server --- deps/amqp_client/src/amqp_network_driver.erl | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/deps/amqp_client/src/amqp_network_driver.erl b/deps/amqp_client/src/amqp_network_driver.erl index 19f9012a10..f8d3bd35e3 100644 --- a/deps/amqp_client/src/amqp_network_driver.erl +++ b/deps/amqp_client/src/amqp_network_driver.erl @@ -78,7 +78,7 @@ close_channel(WriterPid) -> %% This closes the writer down, waits for the confirmation from the %% the channel and then returns the ack to the user close_connection(Close = #'connection.close'{}, From, - #connection_state{channel0_writer_pid = Writer, reader_pid = Reader}) -> + #connection_state{channel0_writer_pid = Writer}) -> rabbit_writer:send_command(Writer, Close), rabbit_writer:shutdown(Writer), receive @@ -87,8 +87,7 @@ close_connection(Close = #'connection.close'{}, From, after 5000 -> exit(timeout_on_exit) - end, - erlang:send_after(?SOCKET_CLOSING_TIMEOUT, Reader, close). + end. do(Writer, Method) -> rabbit_writer:send_command(Writer, Method). do(Writer, Method, Content) -> rabbit_writer:send_command(Writer, Method, Content). @@ -98,7 +97,7 @@ handle_broker_close(#connection_state{channel0_writer_pid = Writer, CloseOk = #'connection.close_ok'{}, rabbit_writer:send_command(Writer, CloseOk), rabbit_writer:shutdown(Writer), - Reader ! close. + erlang:send_after(?SOCKET_CLOSING_TIMEOUT, Reader, close). %--------------------------------------------------------------------------- % AMQP message sending and receiving @@ -166,9 +165,13 @@ start_writer(Sock, Channel) -> reader_loop(Sock, Type, Channel, Length) -> receive {inet_async, Sock, _, {ok, <>} } -> - handle_frame(Type, Channel, Payload), - {ok, _Ref} = prim_inet:async_recv(Sock, 7, -1), - reader_loop(Sock, undefined, undefined, undefined); + case handle_frame(Type, Channel, Payload) of + closed_ok -> + ok; + _ -> + {ok, _Ref} = prim_inet:async_recv(Sock, 7, -1), + reader_loop(Sock, undefined, undefined, undefined) + end; {inet_async, Sock, _, {ok, <<_Type:8,_Channel:16,PayloadSize:32>>}} -> {ok, _Ref} = prim_inet:async_recv(Sock, PayloadSize + 1, -1), reader_loop(Sock, _Type, _Channel, PayloadSize); @@ -206,6 +209,9 @@ handle_frame(Type, Channel, Payload) -> rabbit_misc:die(frame_error); trace when Channel /= 0 -> rabbit_misc:die(frame_error); + {method,'connection.close_ok',Content} -> + send_frame(Channel, {method, 'connection.close_ok', Content}), + closed_ok; AnalyzedFrame -> send_frame(Channel, AnalyzedFrame) end.