Added close handler to network driver
This commit is contained in:
parent
2076ae22ba
commit
6c1106d5ca
|
|
@ -33,6 +33,7 @@
|
|||
direct,
|
||||
channel_max,
|
||||
heartbeat,
|
||||
on_close_handler,
|
||||
channels = dict:new() }).
|
||||
|
||||
-record(channel_state, {number,
|
||||
|
|
|
|||
|
|
@ -45,9 +45,11 @@
|
|||
start(User,Password) -> start(User,Password,false).
|
||||
start(User,Password,ProcLink) when is_boolean(ProcLink) ->
|
||||
Handshake = fun amqp_direct_driver:handshake/1,
|
||||
BrokerCloseHandler = fun amqp_direct_driver:handle_broker_close/1,
|
||||
InitialState = #connection_state{username = User,
|
||||
password = Password,
|
||||
vhostpath = <<"/">>},
|
||||
vhostpath = <<"/">>,
|
||||
on_close_handler = BrokerCloseHandler},
|
||||
{ok, Pid} = start_internal(InitialState, Handshake,ProcLink),
|
||||
{Pid, direct};
|
||||
|
||||
|
|
@ -56,13 +58,15 @@ start(User,Password,Host) -> start(User,Password,Host,<<"/">>,false).
|
|||
start(User,Password,Host,VHost) -> start(User,Password,Host,VHost,false).
|
||||
start(User,Password,Host,VHost,ProcLink) ->
|
||||
Handshake = fun amqp_network_driver:handshake/1,
|
||||
BrokerCloseHandler = fun amqp_network_driver:handle_broker_close/1,
|
||||
InitialState = #connection_state{username = User,
|
||||
password = Password,
|
||||
serverhost = Host,
|
||||
vhostpath = VHost},
|
||||
vhostpath = VHost,
|
||||
on_close_handler = BrokerCloseHandler},
|
||||
{ok, Pid} = start_internal(InitialState, Handshake,ProcLink),
|
||||
{Pid, network}.
|
||||
|
||||
|
||||
start_link(User,Password) -> start(User,Password,true).
|
||||
start_link(User,Password,Host) -> start(User,Password,Host,<<"/">>,true).
|
||||
start_link(User,Password,Host,VHost) -> start(User,Password,Host,VHost,true).
|
||||
|
|
@ -206,8 +210,10 @@ handle_cast(Message, State) ->
|
|||
|
||||
handle_info({method, #'connection.close'{reply_code = Code,
|
||||
reply_text = Text},
|
||||
_Content}, State) ->
|
||||
_Content},
|
||||
State = #connection_state{on_close_handler = OnCloseHandler}) ->
|
||||
io:format("Broker forced connection: ~p -> ~p~n", [Code, Text]),
|
||||
OnCloseHandler(State),
|
||||
{stop, normal, State};
|
||||
|
||||
%---------------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@
|
|||
|
||||
-export([handshake/1, open_channel/3, close_channel/1, close_connection/3]).
|
||||
-export([do/2,do/3]).
|
||||
-export([handle_broker_close/1]).
|
||||
|
||||
%---------------------------------------------------------------------------
|
||||
% Driver API Methods
|
||||
|
|
@ -59,3 +60,5 @@ close_connection(Close, From, State) -> gen_server:reply(From, #'connection.clos
|
|||
|
||||
do(Writer, Method) -> rabbit_channel:do(Writer, Method).
|
||||
do(Writer, Method, Content) -> rabbit_channel:do(Writer, Method, Content).
|
||||
|
||||
handle_broker_close(State) -> ok.
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@
|
|||
-export([handshake/1, open_channel/3, close_channel/1, close_connection/3]).
|
||||
-export([start_reader/2, start_writer/2]).
|
||||
-export([do/2,do/3]).
|
||||
-export([handle_broker_close/1]).
|
||||
|
||||
%---------------------------------------------------------------------------
|
||||
% Driver API Methods
|
||||
|
|
@ -90,6 +91,12 @@ close_connection(Close = #'connection.close'{}, From,
|
|||
do(Writer, Method) -> rabbit_writer:send_command(Writer, Method).
|
||||
do(Writer, Method, Content) -> rabbit_writer:send_command(Writer, Method, Content).
|
||||
|
||||
handle_broker_close(#connection_state{channel0_writer_pid = Writer, reader_pid = Reader}) ->
|
||||
CloseOk = #'connection.close_ok'{},
|
||||
rabbit_writer:send_command(Writer, CloseOk),
|
||||
rabbit_writer:shutdown(Writer),
|
||||
Reader ! close.
|
||||
|
||||
%---------------------------------------------------------------------------
|
||||
% AMQP message sending and receiving
|
||||
%---------------------------------------------------------------------------
|
||||
|
|
|
|||
Loading…
Reference in New Issue