From 33d6b53a3a15491910ed2eac34f19b9a4dae2fe2 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Sun, 7 Dec 2008 16:21:01 +0000 Subject: [PATCH] Massaged module paramterization into something more manageable --- deps/amqp_client/include/amqp_client.hrl | 2 +- deps/amqp_client/src/amqp_connection.erl | 152 +++++++++++------------ 2 files changed, 75 insertions(+), 79 deletions(-) diff --git a/deps/amqp_client/include/amqp_client.hrl b/deps/amqp_client/include/amqp_client.hrl index 56cd33e267..a44c8a5940 100644 --- a/deps/amqp_client/include/amqp_client.hrl +++ b/deps/amqp_client/include/amqp_client.hrl @@ -33,7 +33,7 @@ direct, channel_max, heartbeat, - on_close_handler, + driver, channels = dict:new() }). -record(channel_state, {number, diff --git a/deps/amqp_client/src/amqp_connection.erl b/deps/amqp_client/src/amqp_connection.erl index 29e4303f85..9f14d9ee5c 100644 --- a/deps/amqp_client/src/amqp_connection.erl +++ b/deps/amqp_client/src/amqp_connection.erl @@ -30,8 +30,6 @@ -behaviour(gen_server). --record(state, {conn_state, drv_module, type}). - -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([open_channel/1, open_channel/3]). -export([start/2, start/3, start/4, close/2]). @@ -81,9 +79,9 @@ start_internal(InitialState, DriverType, ProcLink) when is_atom(DriverType) -> DriverSpec = build_driver_spec(atom_to_list(DriverType)), case ProcLink of true -> - gen_server:start_link(?MODULE, [InitialState, DriverSpec, DriverType], []); + gen_server:start_link(?MODULE, [InitialState, DriverSpec], []); false -> - gen_server:start(?MODULE, [InitialState, DriverSpec, DriverType], []) + gen_server:start(?MODULE, [InitialState, DriverSpec], []) end. %% Opens a channel without having to specify a channel number. @@ -107,38 +105,43 @@ close( {ConnectionPid, Mode}, Close) -> gen_server:call(ConnectionPid, {Mode, Cl %% Starts a new channel process, invokes the correct driver (network or direct) %% to perform any environment specific channel setup and starts the %% AMQP ChannelOpen handshake. -handle_start({ChannelNumber, OutOfBand},OpenFun,CloseFun,Do2,Do3,#state{conn_state = CS} = State) -> - {ChannelPid, Number, CS0} = start_channel(ChannelNumber,CloseFun,Do2,Do3,CS), - OpenFun({Number, OutOfBand}, ChannelPid, CS0), - #'channel.open_ok'{} = amqp_channel:call(ChannelPid, #'channel.open'{out_of_band = OutOfBand}), - {reply, ChannelPid, State#state{conn_state = CS0}}. +handle_start({ChannelNumber, OutOfBand}, + #connection_state{driver = Driver} = State) -> + {ChannelPid, Number, NewState} = start_channel(ChannelNumber, State), + Driver:open_channel({Number, OutOfBand}, ChannelPid, NewState), + #'channel.open_ok'{} = amqp_channel:call(ChannelPid, #'channel.open'{}), + {reply, ChannelPid, NewState}. %% Creates a new channel process -start_channel(ChannelNumber,CloseFun,Do2,Do3, CS) -> - ReaderPid = CS#connection_state.reader_pid, - WriterPid = CS#connection_state.channel0_writer_pid, - Number = assign_channel_number(ChannelNumber, CS), - InitialState = #channel_state{parent_connection = self(), - number = Number, - close_fun = CloseFun, - do2 = Do2, do3 = Do3, - reader_pid = ReaderPid, - writer_pid = WriterPid}, +start_channel(ChannelNumber, + State = #connection_state{driver = Driver, + reader_pid = ReaderPid, + channel0_writer_pid = WriterPid}) -> + ChannelState = + #channel_state{ + parent_connection = self(), + number = Number = assign_channel_number(ChannelNumber, State), + close_fun = fun(X) -> Driver:close_channel(X) end, + do2 = fun(X, Y) -> Driver:do(X, Y) end, + do3 = fun(X, Y, Z) -> Driver:do(X, Y, Z) end, + reader_pid = ReaderPid, + writer_pid = WriterPid}, process_flag(trap_exit, true), - {ok, ChannelPid} = gen_server:start_link(amqp_channel, [InitialState], []), - NewCS = register_channel(Number, ChannelPid, CS), - {ChannelPid, Number, NewCS}. + {ok, ChannelPid} = gen_server:start_link(amqp_channel, + [ChannelState], []), + NewState = register_channel(Number, ChannelPid, State), + {ChannelPid, Number, NewState}. -assign_channel_number(none, CS) -> - Channels = CS#connection_state.channels, - Max = CS#connection_state.channel_max, +assign_channel_number(none, #connection_state{channels = Channels, + channel_max = Max}) -> allocate_channel_number(dict:fetch_keys(Channels), Max); + assign_channel_number(ChannelNumber, _State) -> %% TODO bug: check whether this is already taken ChannelNumber. -register_channel(ChannelNumber, ChannelPid, CS) -> - Channels0 = CS#connection_state.channels, +register_channel(ChannelNumber, ChannelPid, + State = #connection_state{channels = Channels0}) -> Channels1 = case dict:is_key(ChannelNumber, Channels0) of true -> @@ -146,36 +149,33 @@ register_channel(ChannelNumber, ChannelPid, CS) -> false -> dict:store(ChannelNumber, ChannelPid, Channels0) end, - CS#connection_state{channels = Channels1}. - -%% This will be called when a channel process exits and needs to be deregistered -%% This peforms the reverse mapping so that you can lookup a channel pid -%% Let's hope that this lookup doesn't get too expensive ....... -unregister_channel(ChannelPid, DrvType, CS) when is_pid(ChannelPid)-> - Channels0 = CS#connection_state.channels, - ReverseMapping = fun(_Number, Pid) -> Pid == ChannelPid end, - Projection = dict:filter(ReverseMapping, Channels0), - Channels1 = unregister_direct(Projection, Channels0, DrvType), - CS#connection_state{channels = Channels1}; + State#connection_state{channels = Channels1}. %% This will be called when a channel process exits and needs to be %% deregistered -unregister_channel(ChannelNumber, _DrvType, CS) -> - Channels0 = CS#connection_state.channels, +%% This peforms the reverse mapping so that you can lookup a channel pid +%% Let's hope that this lookup doesn't get too expensive ....... +unregister_channel(ChannelPid, + State = #connection_state{channels = Channels0} ) + when is_pid(ChannelPid)-> + ReverseMapping = fun(_Number, Pid) -> Pid == ChannelPid end, + Projection = dict:filter(ReverseMapping, Channels0), + %% TODO This differentiation is only necessary for the direct channel, + %% look into preventing the invocation of this method + Channels1 = case dict:fetch_keys(Projection) of + [] -> + Channels0; + [ChannelNumber|_] -> + dict:erase(ChannelNumber, Channels0) + end, + State#connection_state{channels = Channels1}; + +%% This will be called when a channel process exits and needs to be +%% deregistered +unregister_channel(ChannelNumber, + State = #connection_state{channels = Channels0}) -> Channels1 = dict:erase(ChannelNumber, Channels0), - CS#connection_state{channels = Channels1}. - -%% TODO This differentiation is only necessary for the direct channel, -%% look into refactoring -unregister_direct(Projection, Channels0, direct) -> - case dict:fetch_keys(Projection) of - [] -> - Channels0; - [ChannelNumber|_] -> - dict:erase(ChannelNumber, Channels0) - end; -unregister_direct(_Projection, Channels0, _Type) -> - Channels0. + State#connection_state{channels = Channels1}. allocate_channel_number([], _Max)-> 1; @@ -184,32 +184,32 @@ allocate_channel_number(Channels, _Max) -> %% TODO check channel max and reallocate appropriately MaxChannel + 1. -close_connection(Close, From, #state{drv_module = Mod, conn_state = CS}) -> - Mod:close_connection(Close, From, CS). +close_connection(Close, From, State = #connection_state{driver = Driver}) -> + Driver:close_connection(Close, From, State). %--------------------------------------------------------------------------- % gen_server callbacks %--------------------------------------------------------------------------- -init([InitialState, DrvMod, DrvType]) - when is_atom(DrvMod), is_atom(DrvType) -> - CS = DrvMod:handshake(InitialState), % Connection state - {ok, #state{conn_state = CS, drv_module = DrvMod, type = DrvType}}. +init([InitialState, Driver]) when is_atom(Driver) -> + State = Driver:handshake(InitialState), + {ok, State#connection_state{driver = Driver} }. %% Starts a new channel -handle_call({_Whatever, ChannelNumber, OutOfBand}, _From, - #state{drv_module = Module} = State) -> +%% TODO This is very leaky gen_server callback - could get tagged properly +%% to avoid ambiguous calls +handle_call({_Whatever, ChannelNumber, OutOfBand}, _From, State) -> handle_start( {ChannelNumber, OutOfBand}, - fun(X, Y, Z) -> Module:open_channel(X, Y, Z) end, - fun(X) -> Module:close_channel(X) end, - fun(X, Y) -> Module:do(X, Y) end, - fun(X, Y, Z) -> Module:do(X, Y, Z) end, + % fun(X, Y, Z) -> Module:open_channel(X, Y, Z) end, + % fun(X) -> Module:close_channel(X) end, + % fun(X, Y) -> Module:do(X, Y) end, + % fun(X, Y, Z) -> Module:do(X, Y, Z) end, State ); %% Shuts the AMQP connection down -handle_call({_Mode, Close = #'connection.close'{}}, From, #state{} = State) -> +handle_call({_Mode, Close = #'connection.close'{}}, From, State) -> close_connection(Close, From, State), {stop,normal,State}. @@ -223,16 +223,16 @@ handle_cast(_Message, State) -> handle_info({method, #'connection.close'{reply_code = Code, reply_text = Text}, _Content}, - State = #connection_state{on_close_handler = OnCloseHandler}) -> + State = #connection_state{driver = Driver}) -> io:format("Broker forced connection: ~p -> ~p~n", [Code, Text]), - OnCloseHandler(State), + Driver:handle_broker_close(State), {stop, normal, State}; %--------------------------------------------------------------------------- % Trap exits %--------------------------------------------------------------------------- -handle_info( {'EXIT', Pid, {amqp,Reason,Msg,Context}}, #state{} = State) -> +handle_info( {'EXIT', Pid, {amqp,Reason,Msg,Context}}, State) -> io:format("Channel Peer ~p sent this message: ~p -> ~p~n",[Pid,Msg,Context]), {HardError, Code, Text} = rabbit_framing:lookup_amqp_exception(Reason), case HardError of @@ -245,20 +245,16 @@ handle_info( {'EXIT', Pid, {amqp,Reason,Msg,Context}}, #state{} = State) -> end; %% Just the amqp channel shutting down, so unregister this channel -handle_info( {'EXIT', Pid, normal}, #state{conn_state = CS, - type = Type} = State) -> - NewCS = unregister_channel(Pid, Type, CS), - {noreply, State#state{conn_state = NewCS}}; +handle_info( {'EXIT', Pid, normal}, State) -> + {noreply, unregister_channel(Pid, State) }; % This is a special case for abruptly closed socket connections handle_info( {'EXIT', _Pid, {socket_error, Reason}}, State) -> {stop, {socket_error, Reason}, State}; -handle_info( {'EXIT', Pid, Reason}, #state{conn_state = CS, - type = Type} = State) -> +handle_info( {'EXIT', Pid, Reason}, State) -> io:format("Connection: Handling exit from ~p --> ~p~n",[Pid,Reason]), - NewCS = unregister_channel(Pid, Type, CS), - {noreply, State#state{conn_state = NewCS}}. + {noreply, unregister_channel(Pid, State) }. %--------------------------------------------------------------------------- % Rest of the gen_server callbacks @@ -267,6 +263,6 @@ handle_info( {'EXIT', Pid, Reason}, #state{conn_state = CS, terminate(_Reason, _State) -> ok. -code_change(_OldVsn, #state{} = State, _Extra) -> +code_change(_OldVsn, State, _Extra) -> State.