Massaged module paramterization into something more manageable

This commit is contained in:
Ben Hood 2008-12-07 16:21:01 +00:00
parent 0b2ef2b0b5
commit 33d6b53a3a
2 changed files with 75 additions and 79 deletions

View File

@ -33,7 +33,7 @@
direct,
channel_max,
heartbeat,
on_close_handler,
driver,
channels = dict:new() }).
-record(channel_state, {number,

View File

@ -30,8 +30,6 @@
-behaviour(gen_server).
-record(state, {conn_state, drv_module, type}).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
-export([open_channel/1, open_channel/3]).
-export([start/2, start/3, start/4, close/2]).
@ -81,9 +79,9 @@ start_internal(InitialState, DriverType, ProcLink) when is_atom(DriverType) ->
DriverSpec = build_driver_spec(atom_to_list(DriverType)),
case ProcLink of
true ->
gen_server:start_link(?MODULE, [InitialState, DriverSpec, DriverType], []);
gen_server:start_link(?MODULE, [InitialState, DriverSpec], []);
false ->
gen_server:start(?MODULE, [InitialState, DriverSpec, DriverType], [])
gen_server:start(?MODULE, [InitialState, DriverSpec], [])
end.
%% Opens a channel without having to specify a channel number.
@ -107,38 +105,43 @@ close( {ConnectionPid, Mode}, Close) -> gen_server:call(ConnectionPid, {Mode, Cl
%% Starts a new channel process, invokes the correct driver (network or direct)
%% to perform any environment specific channel setup and starts the
%% AMQP ChannelOpen handshake.
handle_start({ChannelNumber, OutOfBand},OpenFun,CloseFun,Do2,Do3,#state{conn_state = CS} = State) ->
{ChannelPid, Number, CS0} = start_channel(ChannelNumber,CloseFun,Do2,Do3,CS),
OpenFun({Number, OutOfBand}, ChannelPid, CS0),
#'channel.open_ok'{} = amqp_channel:call(ChannelPid, #'channel.open'{out_of_band = OutOfBand}),
{reply, ChannelPid, State#state{conn_state = CS0}}.
handle_start({ChannelNumber, OutOfBand},
#connection_state{driver = Driver} = State) ->
{ChannelPid, Number, NewState} = start_channel(ChannelNumber, State),
Driver:open_channel({Number, OutOfBand}, ChannelPid, NewState),
#'channel.open_ok'{} = amqp_channel:call(ChannelPid, #'channel.open'{}),
{reply, ChannelPid, NewState}.
%% Creates a new channel process
start_channel(ChannelNumber,CloseFun,Do2,Do3, CS) ->
ReaderPid = CS#connection_state.reader_pid,
WriterPid = CS#connection_state.channel0_writer_pid,
Number = assign_channel_number(ChannelNumber, CS),
InitialState = #channel_state{parent_connection = self(),
number = Number,
close_fun = CloseFun,
do2 = Do2, do3 = Do3,
reader_pid = ReaderPid,
writer_pid = WriterPid},
start_channel(ChannelNumber,
State = #connection_state{driver = Driver,
reader_pid = ReaderPid,
channel0_writer_pid = WriterPid}) ->
ChannelState =
#channel_state{
parent_connection = self(),
number = Number = assign_channel_number(ChannelNumber, State),
close_fun = fun(X) -> Driver:close_channel(X) end,
do2 = fun(X, Y) -> Driver:do(X, Y) end,
do3 = fun(X, Y, Z) -> Driver:do(X, Y, Z) end,
reader_pid = ReaderPid,
writer_pid = WriterPid},
process_flag(trap_exit, true),
{ok, ChannelPid} = gen_server:start_link(amqp_channel, [InitialState], []),
NewCS = register_channel(Number, ChannelPid, CS),
{ChannelPid, Number, NewCS}.
{ok, ChannelPid} = gen_server:start_link(amqp_channel,
[ChannelState], []),
NewState = register_channel(Number, ChannelPid, State),
{ChannelPid, Number, NewState}.
assign_channel_number(none, CS) ->
Channels = CS#connection_state.channels,
Max = CS#connection_state.channel_max,
assign_channel_number(none, #connection_state{channels = Channels,
channel_max = Max}) ->
allocate_channel_number(dict:fetch_keys(Channels), Max);
assign_channel_number(ChannelNumber, _State) ->
%% TODO bug: check whether this is already taken
ChannelNumber.
register_channel(ChannelNumber, ChannelPid, CS) ->
Channels0 = CS#connection_state.channels,
register_channel(ChannelNumber, ChannelPid,
State = #connection_state{channels = Channels0}) ->
Channels1 =
case dict:is_key(ChannelNumber, Channels0) of
true ->
@ -146,36 +149,33 @@ register_channel(ChannelNumber, ChannelPid, CS) ->
false ->
dict:store(ChannelNumber, ChannelPid, Channels0)
end,
CS#connection_state{channels = Channels1}.
%% This will be called when a channel process exits and needs to be deregistered
%% This peforms the reverse mapping so that you can lookup a channel pid
%% Let's hope that this lookup doesn't get too expensive .......
unregister_channel(ChannelPid, DrvType, CS) when is_pid(ChannelPid)->
Channels0 = CS#connection_state.channels,
ReverseMapping = fun(_Number, Pid) -> Pid == ChannelPid end,
Projection = dict:filter(ReverseMapping, Channels0),
Channels1 = unregister_direct(Projection, Channels0, DrvType),
CS#connection_state{channels = Channels1};
State#connection_state{channels = Channels1}.
%% This will be called when a channel process exits and needs to be
%% deregistered
unregister_channel(ChannelNumber, _DrvType, CS) ->
Channels0 = CS#connection_state.channels,
%% This peforms the reverse mapping so that you can lookup a channel pid
%% Let's hope that this lookup doesn't get too expensive .......
unregister_channel(ChannelPid,
State = #connection_state{channels = Channels0} )
when is_pid(ChannelPid)->
ReverseMapping = fun(_Number, Pid) -> Pid == ChannelPid end,
Projection = dict:filter(ReverseMapping, Channels0),
%% TODO This differentiation is only necessary for the direct channel,
%% look into preventing the invocation of this method
Channels1 = case dict:fetch_keys(Projection) of
[] ->
Channels0;
[ChannelNumber|_] ->
dict:erase(ChannelNumber, Channels0)
end,
State#connection_state{channels = Channels1};
%% This will be called when a channel process exits and needs to be
%% deregistered
unregister_channel(ChannelNumber,
State = #connection_state{channels = Channels0}) ->
Channels1 = dict:erase(ChannelNumber, Channels0),
CS#connection_state{channels = Channels1}.
%% TODO This differentiation is only necessary for the direct channel,
%% look into refactoring
unregister_direct(Projection, Channels0, direct) ->
case dict:fetch_keys(Projection) of
[] ->
Channels0;
[ChannelNumber|_] ->
dict:erase(ChannelNumber, Channels0)
end;
unregister_direct(_Projection, Channels0, _Type) ->
Channels0.
State#connection_state{channels = Channels1}.
allocate_channel_number([], _Max)-> 1;
@ -184,32 +184,32 @@ allocate_channel_number(Channels, _Max) ->
%% TODO check channel max and reallocate appropriately
MaxChannel + 1.
close_connection(Close, From, #state{drv_module = Mod, conn_state = CS}) ->
Mod:close_connection(Close, From, CS).
close_connection(Close, From, State = #connection_state{driver = Driver}) ->
Driver:close_connection(Close, From, State).
%---------------------------------------------------------------------------
% gen_server callbacks
%---------------------------------------------------------------------------
init([InitialState, DrvMod, DrvType])
when is_atom(DrvMod), is_atom(DrvType) ->
CS = DrvMod:handshake(InitialState), % Connection state
{ok, #state{conn_state = CS, drv_module = DrvMod, type = DrvType}}.
init([InitialState, Driver]) when is_atom(Driver) ->
State = Driver:handshake(InitialState),
{ok, State#connection_state{driver = Driver} }.
%% Starts a new channel
handle_call({_Whatever, ChannelNumber, OutOfBand}, _From,
#state{drv_module = Module} = State) ->
%% TODO This is very leaky gen_server callback - could get tagged properly
%% to avoid ambiguous calls
handle_call({_Whatever, ChannelNumber, OutOfBand}, _From, State) ->
handle_start(
{ChannelNumber, OutOfBand},
fun(X, Y, Z) -> Module:open_channel(X, Y, Z) end,
fun(X) -> Module:close_channel(X) end,
fun(X, Y) -> Module:do(X, Y) end,
fun(X, Y, Z) -> Module:do(X, Y, Z) end,
% fun(X, Y, Z) -> Module:open_channel(X, Y, Z) end,
% fun(X) -> Module:close_channel(X) end,
% fun(X, Y) -> Module:do(X, Y) end,
% fun(X, Y, Z) -> Module:do(X, Y, Z) end,
State
);
%% Shuts the AMQP connection down
handle_call({_Mode, Close = #'connection.close'{}}, From, #state{} = State) ->
handle_call({_Mode, Close = #'connection.close'{}}, From, State) ->
close_connection(Close, From, State),
{stop,normal,State}.
@ -223,16 +223,16 @@ handle_cast(_Message, State) ->
handle_info({method, #'connection.close'{reply_code = Code,
reply_text = Text},
_Content},
State = #connection_state{on_close_handler = OnCloseHandler}) ->
State = #connection_state{driver = Driver}) ->
io:format("Broker forced connection: ~p -> ~p~n", [Code, Text]),
OnCloseHandler(State),
Driver:handle_broker_close(State),
{stop, normal, State};
%---------------------------------------------------------------------------
% Trap exits
%---------------------------------------------------------------------------
handle_info( {'EXIT', Pid, {amqp,Reason,Msg,Context}}, #state{} = State) ->
handle_info( {'EXIT', Pid, {amqp,Reason,Msg,Context}}, State) ->
io:format("Channel Peer ~p sent this message: ~p -> ~p~n",[Pid,Msg,Context]),
{HardError, Code, Text} = rabbit_framing:lookup_amqp_exception(Reason),
case HardError of
@ -245,20 +245,16 @@ handle_info( {'EXIT', Pid, {amqp,Reason,Msg,Context}}, #state{} = State) ->
end;
%% Just the amqp channel shutting down, so unregister this channel
handle_info( {'EXIT', Pid, normal}, #state{conn_state = CS,
type = Type} = State) ->
NewCS = unregister_channel(Pid, Type, CS),
{noreply, State#state{conn_state = NewCS}};
handle_info( {'EXIT', Pid, normal}, State) ->
{noreply, unregister_channel(Pid, State) };
% This is a special case for abruptly closed socket connections
handle_info( {'EXIT', _Pid, {socket_error, Reason}}, State) ->
{stop, {socket_error, Reason}, State};
handle_info( {'EXIT', Pid, Reason}, #state{conn_state = CS,
type = Type} = State) ->
handle_info( {'EXIT', Pid, Reason}, State) ->
io:format("Connection: Handling exit from ~p --> ~p~n",[Pid,Reason]),
NewCS = unregister_channel(Pid, Type, CS),
{noreply, State#state{conn_state = NewCS}}.
{noreply, unregister_channel(Pid, State) }.
%---------------------------------------------------------------------------
% Rest of the gen_server callbacks
@ -267,6 +263,6 @@ handle_info( {'EXIT', Pid, Reason}, #state{conn_state = CS,
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, #state{} = State, _Extra) ->
code_change(_OldVsn, State, _Extra) ->
State.