Fixed punctuation

This commit is contained in:
Ben Hood 2009-01-08 12:51:02 +00:00
parent e92bf12c9c
commit 446251e4f8
11 changed files with 361 additions and 261 deletions

View File

@ -32,15 +32,16 @@
-behaviour(gen_server). -behaviour(gen_server).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
-export([call/2, call/3, cast/2, cast/3]). -export([call/2, call/3, cast/2, cast/3]).
-export([subscribe/3]). -export([subscribe/3]).
-export([register_direct_peer/2]). -export([register_direct_peer/2]).
-export([register_return_handler/2]). -export([register_return_handler/2]).
-export([register_flow_handler/2]). -export([register_flow_handler/2]).
%% This diagram shows the interaction between the different component processes %% This diagram shows the interaction between the different component
%% in an AMQP client scenario. %% processes in an AMQP client scenario.
%% %%
%% message* / reply* +-------+ %% message* / reply* +-------+
%% +---------------------- | queue | %% +---------------------- | queue |
@ -62,16 +63,17 @@
%% | %% |
%% [consumer tag --> consumer pid] %% [consumer tag --> consumer pid]
%% %%
%% * These notifications are processed asynchronously via handle_info/2 callbacks %% These notifications are processed asynchronously via
%% handle_info/2 callbacks
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
% AMQP Channel API methods %% AMQP Channel API methods
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
%% Generic AMQP RPC mechanism that expects a pseudo synchronous response %% Generic AMQP RPC mechanism that expects a pseudo synchronous response
call(Channel, Method) -> call(Channel, Method) ->
gen_server:call(Channel, {call, Method}). gen_server:call(Channel, {call, Method}).
%% Generic AMQP send mechanism with content %% Generic AMQP send mechanism with content
call(Channel, Method, Content) -> call(Channel, Method, Content) ->
gen_server:call(Channel, {call, Method, Content}). gen_server:call(Channel, {call, Method, Content}).
@ -84,18 +86,18 @@ cast(Channel, Method) ->
cast(Channel, Method, Content) -> cast(Channel, Method, Content) ->
gen_server:cast(Channel, {cast, Method, Content}). gen_server:cast(Channel, {cast, Method, Content}).
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
% Consumer registration %% Consumer registration
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
%% Registers a consumer pid with the channel %% Registers a consumer pid with the channel
subscribe(Channel, BasicConsume = #'basic.consume'{}, Consumer) -> subscribe(Channel, BasicConsume = #'basic.consume'{}, Consumer) ->
gen_server:call(Channel, {BasicConsume, Consumer}). gen_server:call(Channel, {BasicConsume, Consumer}).
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
% Direct peer registration %% Direct peer registration
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
%% Registers the direct channel peer with the state of this channel. %% Registers the direct channel peer with the state of this channel.
%% This registration occurs after the amqp_channel gen_server instance %% This registration occurs after the amqp_channel gen_server instance
@ -116,15 +118,15 @@ register_return_handler(Channel, ReturnHandler) ->
register_flow_handler(Channel, FlowHandler) -> register_flow_handler(Channel, FlowHandler) ->
gen_server:cast(Channel, {register_flow_handler, FlowHandler} ). gen_server:cast(Channel, {register_flow_handler, FlowHandler} ).
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
% Internal plumbing %% Internal plumbing
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
rpc_top_half(Method, From, State = #channel_state{writer_pid = Writer, rpc_top_half(Method, From, State = #channel_state{writer_pid = Writer,
rpc_requests = RequestQueue, rpc_requests = RequestQueue,
do2 = Do2}) -> do2 = Do2}) ->
% Enqueue the incoming RPC request to serialize RPC dispatching % Enqueue the incoming RPC request to serialize RPC dispatching
NewRequestQueue = queue:in({From,Method}, RequestQueue), NewRequestQueue = queue:in({From, Method}, RequestQueue),
NewState = State#channel_state{rpc_requests = NewRequestQueue}, NewState = State#channel_state{rpc_requests = NewRequestQueue},
case queue:len(NewRequestQueue) of case queue:len(NewRequestQueue) of
1 -> 1 ->
@ -134,9 +136,10 @@ rpc_top_half(Method, From, State = #channel_state{writer_pid = Writer,
end, end,
{noreply, NewState}. {noreply, NewState}.
rpc_bottom_half(#'channel.close'{reply_code = ReplyCode, rpc_bottom_half(#'channel.close'{reply_code = ReplyCode,
reply_text = ReplyText},State) -> reply_text = ReplyText}, State) ->
io:format("Channel received close from peer, code: ~p , message: ~p~n",[ReplyCode,ReplyText]), io:format("Channel received close from peer, code: ~p , message: ~p~n",
[ReplyCode,ReplyText]),
NewState = channel_cleanup(State), NewState = channel_cleanup(State),
{stop, normal, NewState}; {stop, normal, NewState};
@ -145,9 +148,9 @@ rpc_bottom_half(Reply, State = #channel_state{writer_pid = Writer,
do2 = Do2}) -> do2 = Do2}) ->
NewRequestQueue = NewRequestQueue =
case queue:out(RequestQueue) of case queue:out(RequestQueue) of
{empty, {[],[]}} -> exit(empty_rpc_bottom_half); {empty, {[], []}} -> exit(empty_rpc_bottom_half);
{{value, {From, _}}, Q} -> gen_server:reply(From, Reply), {{value, {From, _}}, Q} -> gen_server:reply(From, Reply),
Q Q
end, end,
case queue:is_empty(NewRequestQueue) of case queue:is_empty(NewRequestQueue) of
true -> ok; true -> ok;
@ -163,15 +166,18 @@ resolve_consumer(_ConsumerTag, #channel_state{consumers = []}) ->
resolve_consumer(ConsumerTag, #channel_state{consumers = Consumers}) -> resolve_consumer(ConsumerTag, #channel_state{consumers = Consumers}) ->
dict:fetch(ConsumerTag, Consumers). dict:fetch(ConsumerTag, Consumers).
register_consumer(ConsumerTag, Consumer, State = #channel_state{consumers = Consumers0}) -> register_consumer(ConsumerTag, Consumer,
State = #channel_state{consumers = Consumers0}) ->
Consumers1 = dict:store(ConsumerTag, Consumer, Consumers0), Consumers1 = dict:store(ConsumerTag, Consumer, Consumers0),
State#channel_state{consumers = Consumers1}. State#channel_state{consumers = Consumers1}.
unregister_consumer(ConsumerTag, State = #channel_state{consumers = Consumers0}) -> unregister_consumer(ConsumerTag,
State = #channel_state{consumers = Consumers0}) ->
Consumers1 = dict:erase(ConsumerTag, Consumers0), Consumers1 = dict:erase(ConsumerTag, Consumers0),
State#channel_state{consumers = Consumers1}. State#channel_state{consumers = Consumers1}.
shutdown_writer(State = #channel_state{close_fun = CloseFun, writer_pid = WriterPid}) -> shutdown_writer(State = #channel_state{close_fun = CloseFun,
writer_pid = WriterPid}) ->
CloseFun(WriterPid), CloseFun(WriterPid),
State. State.
@ -193,41 +199,44 @@ return_handler(State = #channel_state{return_handler_pid = undefined}) ->
return_handler(State = #channel_state{return_handler_pid = ReturnHandler}) -> return_handler(State = #channel_state{return_handler_pid = ReturnHandler}) ->
{ReturnHandler, State}. {ReturnHandler, State}.
handle_method(BasicConsumeOk = #'basic.consume_ok'{consumer_tag = ConsumerTag}, handle_method(ConsumeOk = #'basic.consume_ok'{consumer_tag = ConsumerTag},
State = #channel_state{anon_sub_requests = Anon, State = #channel_state{anon_sub_requests = Anon,
tagged_sub_requests = Tagged}) -> tagged_sub_requests = Tagged}) ->
{_From, Consumer, State0} = {_From, Consumer, State0} =
case dict:find(ConsumerTag,Tagged) of case dict:find(ConsumerTag, Tagged) of
{ok, {F,C}} -> {ok, {F,C}} ->
NewTagged = dict:erase(ConsumerTag,Tagged), NewTagged = dict:erase(ConsumerTag,Tagged),
{F,C,State#channel_state{tagged_sub_requests = NewTagged}}; {F,C,State#channel_state{tagged_sub_requests = NewTagged}};
error -> error ->
case queue:out(Anon) of case queue:out(Anon) of
{empty,_} -> {empty, _} ->
exit(anonymous_queue_empty, ConsumerTag); exit(anonymous_queue_empty, ConsumerTag);
{{value, {F,C}}, NewAnon} -> {{value, {F, C}}, NewAnon} ->
{F,C,State#channel_state{anon_sub_requests = NewAnon}} {F, C,
State#channel_state{anon_sub_requests = NewAnon}}
end end
end, end,
Consumer ! BasicConsumeOk, Consumer ! ConsumeOk,
State1 = register_consumer(ConsumerTag, Consumer, State0), State1 = register_consumer(ConsumerTag, Consumer, State0),
rpc_bottom_half(BasicConsumeOk,State1); rpc_bottom_half(ConsumeOk, State1);
handle_method(BasicCancelOk = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, State) -> handle_method(CancelOk = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
State) ->
Consumer = resolve_consumer(ConsumerTag, State), Consumer = resolve_consumer(ConsumerTag, State),
Consumer ! BasicCancelOk, Consumer ! CancelOk,
NewState = unregister_consumer(ConsumerTag, State), NewState = unregister_consumer(ConsumerTag, State),
rpc_bottom_half(BasicCancelOk, NewState); rpc_bottom_half(CancelOk, NewState);
handle_method(ChannelCloseOk = #'channel.close_ok'{}, State) -> handle_method(CloseOk = #'channel.close_ok'{}, State) ->
{noreply, NewState} = rpc_bottom_half(ChannelCloseOk, State), {noreply, NewState} = rpc_bottom_half(CloseOk, State),
{stop, normal, NewState}; {stop, normal, NewState};
%% This handles the flow control flag that the broker initiates. %% This handles the flow control flag that the broker initiates.
%% If defined, it informs the flow control handler to suspend submitting %% If defined, it informs the flow control handler to suspend submitting
%% any content bearing methods %% any content bearing methods
handle_method(Flow = #'channel.flow'{active = Active}, handle_method(Flow = #'channel.flow'{active = Active},
State = #channel_state{writer_pid = Writer, do2 = Do2, State = #channel_state{writer_pid = Writer,
do2 = Do2,
flow_handler_pid = FlowHandler}) -> flow_handler_pid = FlowHandler}) ->
case FlowHandler of case FlowHandler of
undefined -> ok; undefined -> ok;
@ -239,9 +248,10 @@ handle_method(Flow = #'channel.flow'{active = Active},
handle_method(Method, State) -> handle_method(Method, State) ->
rpc_bottom_half(Method, State). rpc_bottom_half(Method, State).
handle_method(BasicDeliver = #'basic.deliver'{consumer_tag = ConsumerTag}, Content, State) -> handle_method(Deliver = #'basic.deliver'{consumer_tag = ConsumerTag},
Content, State) ->
Consumer = resolve_consumer(ConsumerTag, State), Consumer = resolve_consumer(ConsumerTag, State),
Consumer ! {BasicDeliver, Content}, Consumer ! {Deliver, Content},
{noreply, State}; {noreply, State};
%% Why is the consumer a handle_method/3 call with the network driver, %% Why is the consumer a handle_method/3 call with the network driver,
@ -257,9 +267,9 @@ handle_method(BasicReturn = #'basic.return'{}, Content, State) ->
handle_method(Method, Content, State) -> handle_method(Method, Content, State) ->
rpc_bottom_half( {Method, Content} , State). rpc_bottom_half( {Method, Content} , State).
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
% gen_server callbacks %% gen_server callbacks
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
init([InitialState]) -> init([InitialState]) ->
{ok, InitialState}. {ok, InitialState}.
@ -298,7 +308,8 @@ handle_call({Method = #'basic.consume'{consumer_tag = Tag}, Consumer},
rpc_top_half(Method, From, NewState). rpc_top_half(Method, From, NewState).
%% Standard implementation of the cast/2 command %% Standard implementation of the cast/2 command
handle_cast({cast, Method}, State = #channel_state{writer_pid = Writer, do2 = Do2}) -> handle_cast({cast, Method}, State = #channel_state{writer_pid = Writer,
do2 = Do2}) ->
Do2(Writer, Method), Do2(Writer, Method),
{noreply, State}; {noreply, State};
@ -319,7 +330,7 @@ handle_cast({cast, Method, Content},
%% Registers the direct channel peer when using the direct client %% Registers the direct channel peer when using the direct client
handle_cast({register_direct_peer, Peer}, State) -> handle_cast({register_direct_peer, Peer}, State) ->
link(Peer), link(Peer),
process_flag(trap_exit,true), process_flag(trap_exit, true),
NewState = State#channel_state{writer_pid = Peer}, NewState = State#channel_state{writer_pid = Peer},
{noreply, NewState}; {noreply, NewState};
@ -336,23 +347,29 @@ handle_cast({register_flow_handler, FlowHandler}, State) ->
handle_cast({notify_sent, _Peer}, State) -> handle_cast({notify_sent, _Peer}, State) ->
{noreply, State}. {noreply, State}.
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
% Rabbit Writer API methods (gen_server callbacks). %% Rabbit Writer API methods (gen_server callbacks).
% These callbacks are invoked when a direct channel sends messages %% These callbacks are invoked when a direct channel sends messages
% to this gen_server instance. %% to this gen_server instance.
%---------------------------------------------------------------------------- %%---------------------------------------------------------------------------
handle_info( {send_command, Method}, State) -> handle_method(Method, State); handle_info( {send_command, Method}, State) ->
handle_info( {send_command, Method, Content}, State) -> handle_method(Method, Content, State); handle_method(Method, State);
%--------------------------------------------------------------------------- handle_info( {send_command, Method, Content}, State) ->
% Network Writer methods (gen_server callbacks). handle_method(Method, Content, State);
% These callbacks are invoked when a network channel sends messages
% to this gen_server instance.
%---------------------------------------------------------------------------
handle_info( {method, Method, none}, State) -> handle_method(Method, State); %%---------------------------------------------------------------------------
handle_info( {method, Method, Content}, State) -> handle_method(Method, Content, State); %% Network Writer methods (gen_server callbacks).
%% These callbacks are invoked when a network channel sends messages
%% to this gen_server instance.
%%---------------------------------------------------------------------------
handle_info( {method, Method, none}, State) ->
handle_method(Method, State);
handle_info( {method, Method, Content}, State) ->
handle_method(Method, Content, State);
%% Handles the delivery of a message from a direct channel %% Handles the delivery of a message from a direct channel
@ -375,28 +392,24 @@ handle_info({'EXIT', _Pid, Reason},
NewState = channel_cleanup(State), NewState = channel_cleanup(State),
{stop, normal, NewState}; {stop, normal, NewState};
%--------------------------------------------------------------------------- %% This is for a race condition between a close.close_ok and a subsequent
% This is for a race condition between a close.close_ok and a subsequent channel.open %% channel.open
%---------------------------------------------------------------------------
handle_info( {channel_close, Peer}, State ) -> handle_info( {channel_close, Peer}, State ) ->
NewState = channel_cleanup(State), NewState = channel_cleanup(State),
%% TODO Do we still need this?? %% TODO Do we still need this??
Peer ! handshake, Peer ! handshake,
{noreply, NewState}; {noreply, NewState};
%--------------------------------------------------------------------------- %% This is for a channel exception that can't be otherwise handled
% This is for a channel exception that can't be otherwise handled
%---------------------------------------------------------------------------
handle_info( {channel_exception, Channel, Reason}, State) -> handle_info( {channel_exception, Channel, Reason}, State) ->
io:format("Channel ~p is shutting down due to: ~p~n",[Channel, Reason]), io:format("Channel ~p is shutting down due to: ~p~n",[Channel, Reason]),
NewState = channel_cleanup(State), NewState = channel_cleanup(State),
{stop, shutdown, NewState}. {stop, shutdown, NewState}.
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
% Rest of the gen_server callbacks %% Rest of the gen_server callbacks
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
terminate(normal, _State) -> terminate(normal, _State) ->
ok; ok;

View File

@ -30,19 +30,20 @@
-behaviour(gen_server). -behaviour(gen_server).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
-export([open_channel/1, open_channel/3]). -export([open_channel/1, open_channel/3]).
-export([start/2, start/3, start/4, close/2]). -export([start/2, start/3, start/4, close/2]).
-export([start_link/2, start_link/3, start_link/4]). -export([start_link/2, start_link/3, start_link/4]).
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
% AMQP Connection API Methods %% AMQP Connection API Methods
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
%% Starts a direct connection to the Rabbit AMQP server, assuming that %% Starts a direct connection to the Rabbit AMQP server, assuming that
%% the server is running in the same process space. %% the server is running in the same process space.
start(User,Password) -> start(User,Password,false). start(User, Password) -> start(User, Password, false).
start(User,Password,ProcLink) when is_boolean(ProcLink) -> start(User, Password, ProcLink) when is_boolean(ProcLink) ->
InitialState = #connection_state{username = User, InitialState = #connection_state{username = User,
password = Password, password = Password,
vhostpath = <<"/">>}, vhostpath = <<"/">>},
@ -50,9 +51,13 @@ start(User,Password,ProcLink) when is_boolean(ProcLink) ->
Pid; Pid;
%% Starts a networked conection to a remote AMQP server. %% Starts a networked conection to a remote AMQP server.
start(User,Password,Host) -> start(User,Password,Host,<<"/">>,false). start(User, Password, Host) ->
start(User,Password,Host,VHost) -> start(User,Password,Host,VHost,false). start(User, Password, Host, <<"/">>, false).
start(User,Password,Host,VHost,ProcLink) ->
start(User, Password, Host, VHost) ->
start(User, Password, Host, VHost, false).
start(User, Password, Host, VHost, ProcLink) ->
InitialState = #connection_state{username = User, InitialState = #connection_state{username = User,
password = Password, password = Password,
serverhost = Host, serverhost = Host,
@ -60,37 +65,43 @@ start(User,Password,Host,VHost,ProcLink) ->
{ok, Pid} = start_internal(InitialState, amqp_network_driver, ProcLink), {ok, Pid} = start_internal(InitialState, amqp_network_driver, ProcLink),
Pid. Pid.
start_link(User,Password) -> start(User,Password,true). start_link(User, Password) ->
start_link(User,Password,Host) -> start(User,Password,Host,<<"/">>,true). start(User, Password, true).
start_link(User,Password,Host,VHost) -> start(User,Password,Host,VHost,true).
start_link(User, Password, Host) ->
start(User, Password, Host, <<"/">>, true).
start_link(User, Password, Host, VHost) ->
start(User, Password, Host, VHost, true).
start_internal(InitialState, Driver, _Link = true) when is_atom(Driver) -> start_internal(InitialState, Driver, _Link = true) when is_atom(Driver) ->
gen_server:start_link(?MODULE, [InitialState, Driver], []); gen_server:start_link(?MODULE, [InitialState, Driver], []);
start_internal(InitialState, Driver, _Link = false) when is_atom(Driver) -> start_internal(InitialState, Driver, _Link = false) when is_atom(Driver) ->
gen_server:start(?MODULE, [InitialState, Driver], []). gen_server:start(?MODULE, [InitialState, Driver], []).
%% Opens a channel without having to specify a channel number. %% Opens a channel without having to specify a channel number.
%% This function assumes that an AMQP connection (networked or direct) %% This function assumes that an AMQP connection (networked or direct)
%% has already been successfully established. %% has already been successfully established.
open_channel(ConnectionPid) -> open_channel(ConnectionPid, none, ""). open_channel(ConnectionPid) ->
open_channel(ConnectionPid, none, "").
%% Opens a channel with a specific channel number. %% Opens a channel with a specific channel number.
%% This function assumes that an AMQP connection (networked or direct) %% This function assumes that an AMQP connection (networked or direct)
%% has already been successfully established. %% has already been successfully established.
open_channel(ConnectionPid, ChannelNumber, OutOfBand) -> open_channel(ConnectionPid, ChannelNumber, OutOfBand) ->
gen_server:call(ConnectionPid, gen_server:call(ConnectionPid,
{open_channel, ChannelNumber, {open_channel, ChannelNumber,
amqp_util:binary(OutOfBand)}). amqp_util:binary(OutOfBand)}).
%% Closes the AMQP connection %% Closes the AMQP connection
close(ConnectionPid, Close) -> gen_server:call(ConnectionPid, Close). close(ConnectionPid, Close) -> gen_server:call(ConnectionPid, Close).
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
% Internal plumbing %% Internal plumbing
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
%% Starts a new channel process, invokes the correct driver %% Starts a new channel process, invokes the correct driver
%% (network or direct) to perform any environment specific channel setup and %% (network or direct) to perform any environment specific channel setup and
%% starts the AMQP ChannelOpen handshake. %% starts the AMQP ChannelOpen handshake.
handle_open_channel({ChannelNumber, OutOfBand}, handle_open_channel({ChannelNumber, OutOfBand},
@ -112,8 +123,8 @@ start_channel(ChannelNumber,
close_fun = fun(X) -> Driver:close_channel(X) end, close_fun = fun(X) -> Driver:close_channel(X) end,
do2 = fun(X, Y) -> Driver:do(X, Y) end, do2 = fun(X, Y) -> Driver:do(X, Y) end,
do3 = fun(X, Y, Z) -> Driver:do(X, Y, Z) end, do3 = fun(X, Y, Z) -> Driver:do(X, Y, Z) end,
reader_pid = ReaderPid, reader_pid = ReaderPid,
writer_pid = WriterPid}, writer_pid = WriterPid},
process_flag(trap_exit, true), process_flag(trap_exit, true),
{ok, ChannelPid} = gen_server:start_link(amqp_channel, {ok, ChannelPid} = gen_server:start_link(amqp_channel,
[ChannelState], []), [ChannelState], []),
@ -139,7 +150,7 @@ register_channel(ChannelNumber, ChannelPid,
end, end,
State#connection_state{channels = Channels1}. State#connection_state{channels = Channels1}.
%% This will be called when a channel process exits and needs to be %% This will be called when a channel process exits and needs to be
%% deregistered %% deregistered
%% This peforms the reverse mapping so that you can lookup a channel pid %% This peforms the reverse mapping so that you can lookup a channel pid
%% Let's hope that this lookup doesn't get too expensive ....... %% Let's hope that this lookup doesn't get too expensive .......
@ -157,8 +168,8 @@ unregister_channel(ChannelPid,
dict:erase(ChannelNumber, Channels0) dict:erase(ChannelNumber, Channels0)
end, end,
State#connection_state{channels = Channels1}; State#connection_state{channels = Channels1};
%% This will be called when a channel process exits and needs to be %% This will be called when a channel process exits and needs to be
%% deregistered %% deregistered
unregister_channel(ChannelNumber, unregister_channel(ChannelNumber,
State = #connection_state{channels = Channels0}) -> State = #connection_state{channels = Channels0}) ->
@ -175,9 +186,9 @@ allocate_channel_number(Channels, _Max) ->
close_connection(Close, From, State = #connection_state{driver = Driver}) -> close_connection(Close, From, State = #connection_state{driver = Driver}) ->
Driver:close_connection(Close, From, State). Driver:close_connection(Close, From, State).
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
% gen_server callbacks %% gen_server callbacks
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
init([InitialState, Driver]) when is_atom(Driver) -> init([InitialState, Driver]) when is_atom(Driver) ->
State = Driver:handshake(InitialState), State = Driver:handshake(InitialState),
@ -190,14 +201,14 @@ handle_call({open_channel, ChannelNumber, OutOfBand}, _From, State) ->
%% Shuts the AMQP connection down %% Shuts the AMQP connection down
handle_call(Close = #'connection.close'{}, From, State) -> handle_call(Close = #'connection.close'{}, From, State) ->
close_connection(Close, From, State), close_connection(Close, From, State),
{stop,normal,State}. {stop, normal, State}.
handle_cast(_Message, State) -> handle_cast(_Message, State) ->
{noreply, State}. {noreply, State}.
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
% Handle forced close from the broker %% Handle forced close from the broker
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
handle_info({method, #'connection.close'{reply_code = Code, handle_info({method, #'connection.close'{reply_code = Code,
reply_text = Text}, _Content}, reply_text = Text}, _Content},
@ -206,37 +217,39 @@ handle_info({method, #'connection.close'{reply_code = Code,
Driver:handle_broker_close(State), Driver:handle_broker_close(State),
{stop, normal, State}; {stop, normal, State};
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
% Trap exits %% Trap exits
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
handle_info( {'EXIT', Pid, {amqp,Reason,Msg,Context}}, State) -> handle_info( {'EXIT', Pid, {amqp, Reason, Msg, Context}}, State) ->
io:format("Channel Peer ~p sent this message: ~p -> ~p~n",[Pid,Msg,Context]), io:format("Channel Peer ~p sent this message: ~p -> ~p~n",
[Pid, Msg, Context]),
{HardError, Code, Text} = rabbit_framing:lookup_amqp_exception(Reason), {HardError, Code, Text} = rabbit_framing:lookup_amqp_exception(Reason),
case HardError of case HardError of
false -> false ->
io:format("Just trapping this exit and proceding to trap an exit from the client channel process~n"), io:format("Just trapping this exit and proceding to trap an
exit from the client channel process~n"),
{noreply, State}; {noreply, State};
true -> true ->
io:format("Hard error: (Code = ~p, Text = ~p)~n", [Code, Text]), io:format("Hard error: (Code = ~p, Text = ~p)~n", [Code, Text]),
{stop, {hard_error, {Code, Text}}, State} {stop, {hard_error, {Code, Text}}, State}
end; end;
%% Just the amqp channel shutting down, so unregister this channel %% Just the amqp channel shutting down, so unregister this channel
handle_info( {'EXIT', Pid, normal}, State) -> handle_info( {'EXIT', Pid, normal}, State) ->
{noreply, unregister_channel(Pid, State) }; {noreply, unregister_channel(Pid, State) };
% This is a special case for abruptly closed socket connections %% This is a special case for abruptly closed socket connections
handle_info( {'EXIT', _Pid, {socket_error, Reason}}, State) -> handle_info( {'EXIT', _Pid, {socket_error, Reason}}, State) ->
{stop, {socket_error, Reason}, State}; {stop, {socket_error, Reason}, State};
handle_info( {'EXIT', Pid, Reason}, State) -> handle_info( {'EXIT', Pid, Reason}, State) ->
io:format("Connection: Handling exit from ~p --> ~p~n",[Pid,Reason]), io:format("Connection: Handling exit from ~p --> ~p~n", [Pid, Reason]),
{noreply, unregister_channel(Pid, State) }. {noreply, unregister_channel(Pid, State) }.
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
% Rest of the gen_server callbacks %% Rest of the gen_server callbacks
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
terminate(_Reason, _State) -> terminate(_Reason, _State) ->
ok. ok.

View File

@ -30,7 +30,7 @@
-include("amqp_client.hrl"). -include("amqp_client.hrl").
-export([handshake/1, open_channel/3, close_channel/1, close_connection/3]). -export([handshake/1, open_channel/3, close_channel/1, close_connection/3]).
-export([do/2,do/3]). -export([do/2, do/3]).
-export([handle_broker_close/1]). -export([handle_broker_close/1]).
%--------------------------------------------------------------------------- %---------------------------------------------------------------------------
@ -43,23 +43,31 @@ handshake(ConnectionState = #connection_state{username = User,
UserBin = amqp_util:binary(User), UserBin = amqp_util:binary(User),
PassBin = amqp_util:binary(Pass), PassBin = amqp_util:binary(Pass),
rabbit_access_control:user_pass_login(UserBin, PassBin), rabbit_access_control:user_pass_login(UserBin, PassBin),
rabbit_access_control:check_vhost_access(#user{username = UserBin}, VHostPath), rabbit_access_control:check_vhost_access(#user{username = UserBin},
VHostPath),
ConnectionState. ConnectionState.
open_channel({_Channel, _OutOfBand}, ChannelPid, open_channel({_Channel, _OutOfBand}, ChannelPid,
State = #connection_state{username = User, vhostpath = VHost}) -> State = #connection_state{username = User,
vhostpath = VHost}) ->
UserBin = amqp_util:binary(User), UserBin = amqp_util:binary(User),
ReaderPid = WriterPid = ChannelPid, ReaderPid = WriterPid = ChannelPid,
Peer = rabbit_channel:start_link(ReaderPid, WriterPid, UserBin, VHost), Peer = rabbit_channel:start_link(ReaderPid, WriterPid, UserBin, VHost),
amqp_channel:register_direct_peer(ChannelPid, Peer), amqp_channel:register_direct_peer(ChannelPid, Peer),
State. State.
close_channel(_WriterPid) -> ok. close_channel(_WriterPid) ->
ok.
close_connection(_Close, From, _State) -> close_connection(_Close, From, _State) ->
gen_server:reply(From, #'connection.close_ok'{}). gen_server:reply(From, #'connection.close_ok'{}).
do(Writer, Method) -> rabbit_channel:do(Writer, Method). do(Writer, Method) ->
do(Writer, Method, Content) -> rabbit_channel:do(Writer, Method, Content). rabbit_channel:do(Writer, Method).
do(Writer, Method, Content) ->
rabbit_channel:do(Writer, Method, Content).
handle_broker_close(_State) ->
ok.
handle_broker_close(_State) -> ok.

View File

@ -31,7 +31,7 @@
-export([handshake/1, open_channel/3, close_channel/1, close_connection/3]). -export([handshake/1, open_channel/3, close_channel/1, close_connection/3]).
-export([start_reader/2, start_writer/2]). -export([start_reader/2, start_writer/2]).
-export([do/2,do/3]). -export([do/2, do/3]).
-export([handle_broker_close/1]). -export([handle_broker_close/1]).
-define(SOCKET_CLOSING_TIMEOUT, 1000). -define(SOCKET_CLOSING_TIMEOUT, 1000).
@ -40,23 +40,26 @@
% Driver API Methods % Driver API Methods
%--------------------------------------------------------------------------- %---------------------------------------------------------------------------
handshake(ConnectionState = #connection_state{serverhost = Host}) -> handshake(State = #connection_state{serverhost = Host}) ->
case gen_tcp:connect(Host, 5672, [binary, {packet, 0},{active,false}]) of case gen_tcp:connect(Host, 5672,
[binary, {packet, 0}, {active, false}]) of
{ok, Sock} -> {ok, Sock} ->
ok = gen_tcp:send(Sock, amqp_util:protocol_header()), ok = gen_tcp:send(Sock, amqp_util:protocol_header()),
Parent = self(), Parent = self(),
FramingPid = rabbit_framing_channel:start_link(fun(X) -> X end, [Parent]), FramingPid = rabbit_framing_channel:start_link(fun(X) -> X end,
ReaderPid = spawn_link(?MODULE, start_reader, [Sock, FramingPid]), [Parent]),
ReaderPid = spawn_link(?MODULE, start_reader,
[Sock, FramingPid]),
WriterPid = start_writer(Sock, 0), WriterPid = start_writer(Sock, 0),
ConnectionState1 = ConnectionState#connection_state{channel0_writer_pid = WriterPid, State1 = State#connection_state{channel0_writer_pid = WriterPid,
reader_pid = ReaderPid, reader_pid = ReaderPid,
sock = Sock}, sock = Sock},
ConnectionState2 = network_handshake(WriterPid, ConnectionState1), State2 = network_handshake(WriterPid, State1),
#connection_state{heartbeat = Heartbeat} = ConnectionState2, #connection_state{heartbeat = Heartbeat} = State2,
ReaderPid ! {heartbeat, Heartbeat}, ReaderPid ! {heartbeat, Heartbeat},
ConnectionState2; State2;
{error, Reason} -> {error, Reason} ->
io:format("Could not start the network driver: ~p~n",[Reason]), io:format("Could not start the network driver: ~p~n", [Reason]),
exit(Reason) exit(Reason)
end. end.
@ -72,7 +75,7 @@ open_channel({ChannelNumber, _OutOfBand}, ChannelPid,
amqp_channel:register_direct_peer(ChannelPid, WriterPid ). amqp_channel:register_direct_peer(ChannelPid, WriterPid ).
close_channel(WriterPid) -> close_channel(WriterPid) ->
%io:format("Shutting the channel writer ~p down~n",[WriterPid]), %io:format("Shutting the channel writer ~p down~n", [WriterPid]),
rabbit_writer:shutdown(WriterPid). rabbit_writer:shutdown(WriterPid).
%% This closes the writer down, waits for the confirmation from the %% This closes the writer down, waits for the confirmation from the
@ -89,8 +92,11 @@ close_connection(Close = #'connection.close'{}, From,
exit(timeout_on_exit) exit(timeout_on_exit)
end. end.
do(Writer, Method) -> rabbit_writer:send_command(Writer, Method). do(Writer, Method) ->
do(Writer, Method, Content) -> rabbit_writer:send_command(Writer, Method, Content). rabbit_writer:send_command(Writer, Method).
do(Writer, Method, Content) ->
rabbit_writer:send_command(Writer, Method, Content).
handle_broker_close(#connection_state{channel0_writer_pid = Writer, handle_broker_close(#connection_state{channel0_writer_pid = Writer,
reader_pid = Reader}) -> reader_pid = Reader}) ->
@ -117,7 +123,8 @@ recv() ->
% Internal plumbing % Internal plumbing
%--------------------------------------------------------------------------- %---------------------------------------------------------------------------
network_handshake(Writer, State = #connection_state{ vhostpath = VHostPath }) -> network_handshake(Writer,
State = #connection_state{ vhostpath = VHostPath }) ->
#'connection.start'{} = recv(), #'connection.start'{} = recv(),
do(Writer, start_ok(State)), do(Writer, start_ok(State)),
#'connection.tune'{channel_max = ChannelMax, #'connection.tune'{channel_max = ChannelMax,
@ -129,7 +136,8 @@ network_handshake(Writer, State = #connection_state{ vhostpath = VHostPath }) ->
do(Writer, TuneOk), do(Writer, TuneOk),
%% This is something where I don't understand the protocol, %% This is something where I don't understand the protocol,
%% What happens if the following command reaches the server before the tune ok? %% What happens if the following command reaches the server
%% before the tune ok?
%% Or doesn't get sent at all? %% Or doesn't get sent at all?
ConnectionOpen = #'connection.open'{virtual_host = VHostPath, ConnectionOpen = #'connection.open'{virtual_host = VHostPath,
capabilities = <<"">>, capabilities = <<"">>,
@ -146,7 +154,7 @@ start_ok(#connection_state{username = Username, password = Password}) ->
client_properties = [ client_properties = [
{<<"product">>, longstr, <<"Erlang-AMQC">>}, {<<"product">>, longstr, <<"Erlang-AMQC">>},
{<<"version">>, longstr, <<"0.1">>}, {<<"version">>, longstr, <<"0.1">>},
{<<"platform">>,longstr, <<"Erlang">>} {<<"platform">>, longstr, <<"Erlang">>}
], ],
mechanism = <<"AMQPLAIN">>, mechanism = <<"AMQPLAIN">>,
response = rabbit_binary_generator:generate_table(LoginTable), response = rabbit_binary_generator:generate_table(LoginTable),
@ -154,7 +162,7 @@ start_ok(#connection_state{username = Username, password = Password}) ->
start_reader(Sock, FramingPid) -> start_reader(Sock, FramingPid) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
put({channel, 0},{chpid, FramingPid}), put({channel, 0}, {chpid, FramingPid}),
{ok, _Ref} = prim_inet:async_recv(Sock, 7, -1), {ok, _Ref} = prim_inet:async_recv(Sock, 7, -1),
reader_loop(Sock, undefined, undefined, undefined), reader_loop(Sock, undefined, undefined, undefined),
gen_tcp:close(Sock). gen_tcp:close(Sock).
@ -164,7 +172,7 @@ start_writer(Sock, Channel) ->
reader_loop(Sock, Type, Channel, Length) -> reader_loop(Sock, Type, Channel, Length) ->
receive receive
{inet_async, Sock, _, {ok, <<Payload:Length/binary,?FRAME_END>>} } -> {inet_async, Sock, _, {ok, <<Payload:Length/binary, ?FRAME_END>>} } ->
case handle_frame(Type, Channel, Payload) of case handle_frame(Type, Channel, Payload) of
closed_ok -> closed_ok ->
ok; ok;
@ -172,9 +180,9 @@ reader_loop(Sock, Type, Channel, Length) ->
{ok, _Ref} = prim_inet:async_recv(Sock, 7, -1), {ok, _Ref} = prim_inet:async_recv(Sock, 7, -1),
reader_loop(Sock, undefined, undefined, undefined) reader_loop(Sock, undefined, undefined, undefined)
end; end;
{inet_async, Sock, _, {ok, <<_Type:8,_Channel:16,PayloadSize:32>>}} -> {inet_async, Sock, _, {ok, <<_Type:8, _Chan:16, PayloadSize:32>>}} ->
{ok, _Ref} = prim_inet:async_recv(Sock, PayloadSize + 1, -1), {ok, _Ref} = prim_inet:async_recv(Sock, PayloadSize + 1, -1),
reader_loop(Sock, _Type, _Channel, PayloadSize); reader_loop(Sock, _Type, _Chan, PayloadSize);
{inet_async, Sock, _Ref, {error, closed}} -> {inet_async, Sock, _Ref, {error, closed}} ->
ok; ok;
{inet_async, Sock, _Ref, {error, Reason}} -> {inet_async, Sock, _Ref, {error, Reason}} ->
@ -187,11 +195,13 @@ reader_loop(Sock, Type, Channel, Length) ->
start_framing_channel(ChannelPid, ChannelNumber), start_framing_channel(ChannelPid, ChannelNumber),
reader_loop(Sock, Type, Channel, Length); reader_loop(Sock, Type, Channel, Length);
timeout -> timeout ->
io:format("Reader (~p) received timeout from heartbeat, exiting ~n",[self()]); io:format("Reader (~p) received timeout from heartbeat,
exiting ~n", [self()]);
close -> close ->
io:format("Reader (~p) received close command, exiting ~n",[self()]); io:format("Reader (~p) received close command,
exiting ~n", [self()]);
{'EXIT', Pid, _Reason} -> {'EXIT', Pid, _Reason} ->
[H|_] = get_keys({chpid,Pid}), [H|_] = get_keys({chpid, Pid}),
erase(H), erase(H),
reader_loop(Sock, Type, Channel, Length); reader_loop(Sock, Type, Channel, Length);
Other -> Other ->
@ -200,8 +210,9 @@ reader_loop(Sock, Type, Channel, Length) ->
end. end.
start_framing_channel(ChannelPid, ChannelNumber) -> start_framing_channel(ChannelPid, ChannelNumber) ->
FramingPid = rabbit_framing_channel:start_link(fun(X) -> link(X), X end, [ChannelPid]), FramingPid = rabbit_framing_channel:start_link(fun(X) -> link(X), X end,
put({channel, ChannelNumber},{chpid, FramingPid}). [ChannelPid]),
put({channel, ChannelNumber}, {chpid, FramingPid}).
handle_frame(Type, Channel, Payload) -> handle_frame(Type, Channel, Payload) ->
case rabbit_reader:analyze_frame(Type, Payload) of case rabbit_reader:analyze_frame(Type, Payload) of
@ -214,7 +225,7 @@ handle_frame(Type, Channel, Payload) ->
heartbeat; heartbeat;
trace -> trace ->
trace; trace;
{method,'connection.close_ok',Content} -> {method, 'connection.close_ok', Content} ->
send_frame(Channel, {method, 'connection.close_ok', Content}), send_frame(Channel, {method, 'connection.close_ok', Content}),
closed_ok; closed_ok;
AnalyzedFrame -> AnalyzedFrame ->
@ -228,3 +239,4 @@ resolve_receiver(Channel) ->
undefined -> undefined ->
exit(unknown_channel) exit(unknown_channel)
end. end.

View File

@ -79,7 +79,7 @@ publish(Payload, From,
reply_to = Q}, reply_to = Q},
lib_amqp:publish(Channel, X, RoutingKey, Payload, Props), lib_amqp:publish(Channel, X, RoutingKey, Payload, Props),
State#rpc_client_state{correlation_id = CorrelationId + 1, State#rpc_client_state{correlation_id = CorrelationId + 1,
continuations continuations
= dict:store(CorrelationId, From, Continuations)}. = dict:store(CorrelationId, From, Continuations)}.
%--------------------------------------------------------------------------- %---------------------------------------------------------------------------

View File

@ -33,7 +33,9 @@
-export([basic_properties/0, protocol_header/0]). -export([basic_properties/0, protocol_header/0]).
basic_properties() -> basic_properties() ->
#'P_basic'{content_type = <<"application/octet-stream">>, delivery_mode = 1, priority = 0}. #'P_basic'{content_type = <<"application/octet-stream">>,
delivery_mode = 1,
priority = 0}.
protocol_header() -> protocol_header() ->
<<"AMQP", 1, 1, ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR>>. <<"AMQP", 1, 1, ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR>>.

View File

@ -33,30 +33,39 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
basic_get_test() -> test_util:basic_get_test(new_connection()). basic_get_test() ->
test_util:basic_get_test(new_connection()).
basic_return_test() -> test_util:basic_return_test(new_connection()). basic_return_test() ->
test_util:basic_return_test(new_connection()).
basic_qos_test() -> test_util:basic_qos_test(new_connection()). basic_qos_test() ->
test_util:basic_qos_test(new_connection()).
basic_recover_test() -> test_util:basic_recover_test(new_connection()). basic_recover_test() ->
test_util:basic_recover_test(new_connection()).
basic_consume_test() -> test_util:basic_consume_test(new_connection()). basic_consume_test() ->
test_util:basic_consume_test(new_connection()).
lifecycle_test() -> test_util:lifecycle_test(new_connection()). lifecycle_test() ->
test_util:lifecycle_test(new_connection()).
basic_ack_test() ->test_util:basic_ack_test(new_connection()). basic_ack_test() ->
test_util:basic_ack_test(new_connection()).
command_serialization_test() -> test_util:command_serialization_test(new_connection()). command_serialization_test() ->
test_util:command_serialization_test(new_connection()).
%---------------------------------------------------------------------------- %%---------------------------------------------------------------------------
% This must be kicked off manually because it can only be run after Rabbit %% This must be kicked off manually because it can only be run after Rabbit
% has been running for 1 minute %% has been running for 1 minute
test_channel_flow() -> test_channel_flow() ->
test_util:channel_flow_test(new_connection()). test_util:channel_flow_test(new_connection()).
%---------------------------------------------------------------------------- %%---------------------------------------------------------------------------
% Negative Tests %% Negative Tests
%%---------------------------------------------------------------------------
non_existent_exchange_test() -> non_existent_exchange_test() ->
negative_test_util:non_existent_exchange_test(new_connection()). negative_test_util:non_existent_exchange_test(new_connection()).
@ -64,10 +73,12 @@ non_existent_exchange_test() ->
queue_unbind_test() -> queue_unbind_test() ->
test_util:queue_unbind_test(new_connection()). test_util:queue_unbind_test(new_connection()).
%---------------------------------------------------------------------------- %%---------------------------------------------------------------------------
%% Common Functions %% Common Functions
%%---------------------------------------------------------------------------
new_connection() -> amqp_connection:start("guest", "guest"). new_connection() ->
amqp_connection:start("guest", "guest").
test_coverage() -> test_coverage() ->
rabbit_misc:enable_cover(), rabbit_misc:enable_cover(),

View File

@ -1,3 +1,28 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is the RabbitMQ Erlang Client.
%%
%% The Initial Developers of the Original Code are LShift Ltd.,
%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
%%
%% Portions created by LShift Ltd., Cohesive Financial
%% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
%% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
%% Technologies Ltd.;
%%
%% All Rights Reserved.
%%
%% Contributor(s): Ben Hood <0x6e6562@gmail.com>.
%%
-module(lib_amqp). -module(lib_amqp).
-include_lib("rabbitmq_server/include/rabbit.hrl"). -include_lib("rabbitmq_server/include/rabbit.hrl").
@ -21,9 +46,12 @@ declare_exchange(Channel, X) ->
declare_exchange(Channel, X, Type) -> declare_exchange(Channel, X, Type) ->
ExchangeDeclare = #'exchange.declare'{exchange = X, ExchangeDeclare = #'exchange.declare'{exchange = X,
type = Type, type = Type,
passive = false, durable = false, passive = false,
auto_delete = false, internal = false, durable = false,
nowait = false, arguments = []}, auto_delete = false,
internal = false,
nowait = false,
arguments = []},
amqp_channel:call(Channel, ExchangeDeclare). amqp_channel:call(Channel, ExchangeDeclare).
delete_exchange(Channel, X) -> delete_exchange(Channel, X) ->
@ -31,9 +59,9 @@ delete_exchange(Channel, X) ->
if_unused = false, nowait = false}, if_unused = false, nowait = false},
#'exchange.delete_ok'{} = amqp_channel:call(Channel, ExchangeDelete). #'exchange.delete_ok'{} = amqp_channel:call(Channel, ExchangeDelete).
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
% TODO This whole section of optional properties and mandatory flags %% TODO This whole section of optional properties and mandatory flags
% may have to be re-thought %% may have to be re-thought
publish(Channel, X, RoutingKey, Payload) -> publish(Channel, X, RoutingKey, Payload) ->
publish(Channel, X, RoutingKey, Payload, false). publish(Channel, X, RoutingKey, Payload, false).
@ -41,10 +69,10 @@ publish(Channel, X, RoutingKey, Payload, Mandatory)
when is_boolean(Mandatory)-> when is_boolean(Mandatory)->
publish(Channel, X, RoutingKey, Payload, Mandatory, publish(Channel, X, RoutingKey, Payload, Mandatory,
amqp_util:basic_properties()); amqp_util:basic_properties());
publish(Channel, X, RoutingKey, Payload, Properties) -> publish(Channel, X, RoutingKey, Payload, Properties) ->
publish(Channel, X, RoutingKey, Payload, false, Properties). publish(Channel, X, RoutingKey, Payload, false, Properties).
publish(Channel, X, RoutingKey, Payload, Mandatory, Properties) -> publish(Channel, X, RoutingKey, Payload, Mandatory, Properties) ->
publish_internal(fun amqp_channel:call/3, publish_internal(fun amqp_channel:call/3,
Channel, X, RoutingKey, Payload, Mandatory, Properties). Channel, X, RoutingKey, Payload, Mandatory, Properties).
@ -69,18 +97,23 @@ publish_internal(Fun, Channel, X, RoutingKey,
payload_fragments_rev = [Payload]}, payload_fragments_rev = [Payload]},
Fun(Channel, BasicPublish, Content). Fun(Channel, BasicPublish, Content).
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
close_channel(Channel) -> close_channel(Channel) ->
ChannelClose = #'channel.close'{reply_code = 200, reply_text = <<"Goodbye">>, ChannelClose = #'channel.close'{reply_code = 200,
class_id = 0, method_id = 0}, reply_text = <<"Goodbye">>,
class_id = 0,
method_id = 0},
#'channel.close_ok'{} = amqp_channel:call(Channel, ChannelClose), #'channel.close_ok'{} = amqp_channel:call(Channel, ChannelClose),
ok. ok.
close_connection(Connection) -> close_connection(Connection) ->
ConnectionClose = #'connection.close'{reply_code = 200, reply_text = <<"Goodbye">>, ConnectionClose = #'connection.close'{reply_code = 200,
class_id = 0, method_id = 0}, reply_text = <<"Goodbye">>,
#'connection.close_ok'{} = amqp_connection:close(Connection, ConnectionClose), class_id = 0,
method_id = 0},
#'connection.close_ok'{} = amqp_connection:close(Connection,
ConnectionClose),
ok. ok.
teardown(Connection, Channel) -> teardown(Connection, Channel) ->
@ -121,13 +154,13 @@ subscribe(Channel, Q, Consumer, Tag, NoAck) ->
consumer_tag = Tag, consumer_tag = Tag,
no_local = false, no_ack = NoAck, no_local = false, no_ack = NoAck,
exclusive = false, nowait = false}, exclusive = false, nowait = false},
#'basic.consume_ok'{consumer_tag = ConsumerTag} = #'basic.consume_ok'{consumer_tag = ConsumerTag} =
amqp_channel:subscribe(Channel,BasicConsume, Consumer), amqp_channel:subscribe(Channel, BasicConsume, Consumer),
ConsumerTag. ConsumerTag.
unsubscribe(Channel, Tag) -> unsubscribe(Channel, Tag) ->
BasicCancel = #'basic.cancel'{consumer_tag = Tag, nowait = false}, BasicCancel = #'basic.cancel'{consumer_tag = Tag, nowait = false},
#'basic.cancel_ok'{} = amqp_channel:call(Channel,BasicCancel), #'basic.cancel_ok'{} = amqp_channel:call(Channel, BasicCancel),
ok. ok.
%%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
@ -174,10 +207,12 @@ delete_queue(Channel, Q) ->
bind_queue(Channel, X, Q, Binding) -> bind_queue(Channel, X, Q, Binding) ->
QueueBind = #'queue.bind'{queue = Q, exchange = X, QueueBind = #'queue.bind'{queue = Q, exchange = X,
routing_key = Binding, nowait = false, arguments = []}, routing_key = Binding,
nowait = false, arguments = []},
#'queue.bind_ok'{} = amqp_channel:call(Channel, QueueBind). #'queue.bind_ok'{} = amqp_channel:call(Channel, QueueBind).
unbind_queue(Channel, X, Q, Binding) -> unbind_queue(Channel, X, Q, Binding) ->
Unbind = #'queue.unbind'{queue = Q, exchange = X, Unbind = #'queue.unbind'{queue = Q, exchange = X,
routing_key = Binding, arguments = []}, routing_key = Binding, arguments = []},
#'queue.unbind_ok'{} = amqp_channel:call(Channel, Unbind). #'queue.unbind_ok'{} = amqp_channel:call(Channel, Unbind).

View File

@ -35,7 +35,7 @@ non_existent_exchange_test(Connection) ->
Payload = <<"foobar">>, Payload = <<"foobar">>,
Channel = lib_amqp:start_channel(Connection), Channel = lib_amqp:start_channel(Connection),
lib_amqp:declare_exchange(Channel, X), lib_amqp:declare_exchange(Channel, X),
% Deliberately mix up the routingkey and exchange arguments %% Deliberately mix up the routingkey and exchange arguments
lib_amqp:publish(Channel, RoutingKey, X, Payload), lib_amqp:publish(Channel, RoutingKey, X, Payload),
receive receive
X -> ok X -> ok
@ -43,4 +43,4 @@ non_existent_exchange_test(Connection) ->
end, end,
?assertNot(is_process_alive(Channel)), ?assertNot(is_process_alive(Channel)),
?assert(is_process_alive(Connection)), ?assert(is_process_alive(Connection)),
lib_amqp:close_connection(Connection). lib_amqp:close_connection(Connection).

View File

@ -30,34 +30,34 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
basic_get_test() -> basic_get_test() ->
test_util:basic_get_test(new_connection()). test_util:basic_get_test(new_connection()).
basic_return_test() -> basic_return_test() ->
test_util:basic_return_test(new_connection()). test_util:basic_return_test(new_connection()).
basic_qos_test() -> basic_qos_test() ->
test_util:basic_qos_test(new_connection()). test_util:basic_qos_test(new_connection()).
basic_recover_test() -> basic_recover_test() ->
test_util:basic_recover_test(new_connection()). test_util:basic_recover_test(new_connection()).
basic_consume_test() -> basic_consume_test() ->
test_util:basic_consume_test(new_connection()). test_util:basic_consume_test(new_connection()).
lifecycle_test() -> lifecycle_test() ->
test_util:lifecycle_test(new_connection()). test_util:lifecycle_test(new_connection()).
basic_ack_test() -> basic_ack_test() ->
test_util:basic_ack_test(new_connection()). test_util:basic_ack_test(new_connection()).
channel_lifecycle_test() -> channel_lifecycle_test() ->
test_util:channel_lifecycle_test(new_connection()). test_util:channel_lifecycle_test(new_connection()).
queue_unbind_test() -> queue_unbind_test() ->
test_util:queue_unbind_test(new_connection()). test_util:queue_unbind_test(new_connection()).
command_serialization_test() -> command_serialization_test() ->
test_util:command_serialization_test(new_connection()). test_util:command_serialization_test(new_connection()).
teardown_test() -> teardown_test() ->
test_util:teardown_test(new_connection()). test_util:teardown_test(new_connection()).
@ -65,14 +65,14 @@ teardown_test() ->
rpc_test() -> rpc_test() ->
test_util:rpc_test(new_connection()). test_util:rpc_test(new_connection()).
%---------------------------------------------------------------------------- %%---------------------------------------------------------------------------
% Negative Tests %% Negative Tests
non_existent_exchange_test() -> non_existent_exchange_test() ->
negative_test_util:non_existent_exchange_test(new_connection()). negative_test_util:non_existent_exchange_test(new_connection()).
%---------------------------------------------------------------------------- %%---------------------------------------------------------------------------
%% Common Functions %% Common Functions
new_connection() -> new_connection() ->
amqp_connection:start("guest", "guest", "localhost"). amqp_connection:start("guest", "guest", "localhost").

View File

@ -32,37 +32,40 @@
-compile([export_all]). -compile([export_all]).
-record(publish,{q, x, routing_key, bind_key, payload, -record(publish, {q, x, routing_key, bind_key, payload,
mandatory = false, immediate = false}). mandatory = false, immediate = false}).
% The latch constant defines how many processes are spawned in order %% The latch constant defines how many processes are spawned in order
% to run certain functionality in parallel. It follows the standard %% to run certain functionality in parallel. It follows the standard
% countdown latch pattern. %% countdown latch pattern.
-define(Latch, 100). -define(Latch, 100).
% The wait constant defines how long a consumer waits before it %% The wait constant defines how long a consumer waits before it
% unsubscribes %% unsubscribes
-define(Wait, 200). -define(Wait, 200).
%%%% %%%%
% %%
% This is an example of how the client interaction should work %% This is an example of how the client interaction should work
% %%
% Connection = amqp_connection:start(User, Password, Host), %% Connection = amqp_connection:start(User, Password, Host),
% Channel = amqp_connection:open_channel(Connection), %% Channel = amqp_connection:open_channel(Connection),
% %%...do something useful %% %%...do something useful
% ChannelClose = #'channel.close'{ %% set the appropriate fields }, %% ChannelClose = #'channel.close'{ %% set the appropriate fields },
% amqp_channel:call(Channel, ChannelClose), %% amqp_channel:call(Channel, ChannelClose),
% ConnectionClose = #'connection.close'{ %% set the appropriate fields }, %% ConnectionClose = #'connection.close'{ %% set the appropriate fields },
% amqp_connection:close(Connection, ConnectionClose). %% amqp_connection:close(Connection, ConnectionClose).
% %%
lifecycle_test(Connection) -> lifecycle_test(Connection) ->
X = <<"x">>, X = <<"x">>,
Channel = lib_amqp:start_channel(Connection), Channel = lib_amqp:start_channel(Connection),
lib_amqp:declare_exchange(Channel, X, <<"topic">>), lib_amqp:declare_exchange(Channel, X, <<"topic">>),
Parent = self(), Parent = self(),
[spawn(fun() -> queue_exchange_binding(Channel, X, Parent, Tag) end) || Tag <- lists:seq(1,?Latch)], [spawn(
fun() ->
queue_exchange_binding(Channel, X, Parent, Tag) end)
|| Tag <- lists:seq(1, ?Latch)],
latch_loop(?Latch), latch_loop(?Latch),
lib_amqp:delete_exchange(Channel, X), lib_amqp:delete_exchange(Channel, X),
lib_amqp:teardown(Connection, Channel), lib_amqp:teardown(Connection, Channel),
@ -74,7 +77,7 @@ queue_exchange_binding(Channel, X, Parent, Tag) ->
after (?Latch - Tag rem 7) * 10 -> after (?Latch - Tag rem 7) * 10 ->
ok ok
end, end,
Q = <<"a.b.c",Tag:32>>, Q = <<"a.b.c", Tag:32>>,
Binding = <<"a.b.c.*">>, Binding = <<"a.b.c.*">>,
Q1 = lib_amqp:declare_queue(Channel, Q), Q1 = lib_amqp:declare_queue(Channel, Q),
?assertMatch(Q, Q1), ?assertMatch(Q, Q1),
@ -89,17 +92,17 @@ channel_lifecycle_test(Connection) ->
lib_amqp:teardown(Connection, Channel2), lib_amqp:teardown(Connection, Channel2),
ok. ok.
% This is designed to exercize the internal queuing mechanism %% This is designed to exercize the internal queuing mechanism
% to ensure that commands are properly serialized %% to ensure that commands are properly serialized
command_serialization_test(Connection) -> command_serialization_test(Connection) ->
Channel = lib_amqp:start_channel(Connection), Channel = lib_amqp:start_channel(Connection),
Parent = self(), Parent = self(),
[spawn(fun() -> [spawn(fun() ->
Q = uuid(), Q = uuid(),
Q1 = lib_amqp:declare_queue(Channel, Q), Q1 = lib_amqp:declare_queue(Channel, Q),
?assertMatch(Q, Q1), ?assertMatch(Q, Q1),
Parent ! finished Parent ! finished
end) || _ <- lists:seq(1,?Latch)], end) || _ <- lists:seq(1, ?Latch)],
latch_loop(?Latch), latch_loop(?Latch),
lib_amqp:teardown(Connection, Channel). lib_amqp:teardown(Connection, Channel).
@ -120,7 +123,7 @@ queue_unbind_test(Connection) ->
get_and_assert_empty(Channel, Q) -> get_and_assert_empty(Channel, Q) ->
BasicGetEmpty = lib_amqp:get(Channel, Q, false), BasicGetEmpty = lib_amqp:get(Channel, Q, false),
?assertMatch('basic.get_empty', BasicGetEmpty). ?assertMatch('basic.get_empty', BasicGetEmpty).
get_and_assert_equals(Channel, Q, Payload) -> get_and_assert_equals(Channel, Q, Payload) ->
Content = lib_amqp:get(Channel, Q), Content = lib_amqp:get(Channel, Q),
#content{payload_fragments_rev = PayloadFragments} = Content, #content{payload_fragments_rev = PayloadFragments} = Content,
@ -129,8 +132,8 @@ get_and_assert_equals(Channel, Q, Payload) ->
basic_get_test(Connection) -> basic_get_test(Connection) ->
Channel = lib_amqp:start_channel(Connection), Channel = lib_amqp:start_channel(Connection),
{ok, Q} = setup_publish(Channel), {ok, Q} = setup_publish(Channel),
% TODO: This could be refactored to use get_and_assert_equals, %% TODO: This could be refactored to use get_and_assert_equals,
% get_and_assert_empty .... would require another bug though :-) %% get_and_assert_empty .... would require another bug though :-)
Content = lib_amqp:get(Channel, Q), Content = lib_amqp:get(Channel, Q),
#content{payload_fragments_rev = PayloadFragments} = Content, #content{payload_fragments_rev = PayloadFragments} = Content,
?assertMatch([<<"foobar">>], PayloadFragments), ?assertMatch([<<"foobar">>], PayloadFragments),
@ -158,7 +161,7 @@ basic_return_test(Connection) ->
?assertMatch([Payload], Payload2); ?assertMatch([Payload], Payload2);
WhatsThis -> WhatsThis ->
%% TODO investigate where this comes from %% TODO investigate where this comes from
io:format("Spurious message ~p~n",[WhatsThis]) io:format("Spurious message ~p~n", [WhatsThis])
after 2000 -> after 2000 ->
exit(no_return_received) exit(no_return_received)
end, end,
@ -193,7 +196,7 @@ consume_loop(Channel, X, RoutingKey, Parent, Tag) ->
receive receive
#'basic.consume_ok'{consumer_tag = Tag} -> ok #'basic.consume_ok'{consumer_tag = Tag} -> ok
end, end,
receive receive
{#'basic.deliver'{}, _} -> ok {#'basic.deliver'{}, _} -> ok
end, end,
lib_amqp:unsubscribe(Channel, Tag), lib_amqp:unsubscribe(Channel, Tag),
@ -221,7 +224,7 @@ basic_recover_test(Connection) ->
exit(did_not_receive_first_message) exit(did_not_receive_first_message)
end, end,
BasicRecover = #'basic.recover'{requeue = true}, BasicRecover = #'basic.recover'{requeue = true},
amqp_channel:cast(Channel,BasicRecover), amqp_channel:cast(Channel, BasicRecover),
receive receive
{#'basic.deliver'{delivery_tag = DeliveryTag2}, _} -> {#'basic.deliver'{delivery_tag = DeliveryTag2}, _} ->
lib_amqp:ack(Channel, DeliveryTag2) lib_amqp:ack(Channel, DeliveryTag2)
@ -230,27 +233,28 @@ basic_recover_test(Connection) ->
end, end,
lib_amqp:teardown(Connection, Channel). lib_amqp:teardown(Connection, Channel).
% QOS is not yet implemented in RabbitMQ %% QOS is not yet implemented in RabbitMQ
basic_qos_test(Connection) -> basic_qos_test(Connection) ->
lib_amqp:close_connection(Connection). lib_amqp:close_connection(Connection).
% Reject is not yet implemented in RabbitMQ %% Reject is not yet implemented in RabbitMQ
basic_reject_test(Connection) -> basic_reject_test(Connection) ->
lib_amqp:close_connection(Connection). lib_amqp:close_connection(Connection).
%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
% Unit test for the direct client %% Unit test for the direct client
% This just relies on the fact that a fresh Rabbit VM must consume more than %% This just relies on the fact that a fresh Rabbit VM must consume more than
% 0.1 pc of the system memory: %% 0.1 pc of the system memory:
% 0. Wait 1 minute to let memsup do stuff %% 0. Wait 1 minute to let memsup do stuff
% 1. Make sure that the high watermark is set high %% 1. Make sure that the high watermark is set high
% 2. Start a process to receive the pause and resume commands from the broker %% 2. Start a process to receive the pause and resume commands from the broker
% 3. Register this as flow control notification handler %% 3. Register this as flow control notification handler
% 4. Let the system settle for a little bit %% 4. Let the system settle for a little bit
% 5. Set the threshold to the lowest possible value %% 5. Set the threshold to the lowest possible value
% 6. When the flow handler receives the pause command, it sets the watermark %% 6. When the flow handler receives the pause command, it sets the watermark
% to a high value in order to get the broker to send the resume command %% to a high value in order to get the broker to send the resume command
% 7. Allow 10 secs to receive the pause and resume, otherwise timeout and fail %% 7. Allow 10 secs to receive the pause and resume, otherwise timeout and
%% fail
channel_flow_test(Connection) -> channel_flow_test(Connection) ->
X = <<"amq.direct">>, X = <<"amq.direct">>,
K = Payload = <<"x">>, K = Payload = <<"x">>,
@ -280,9 +284,9 @@ channel_flow_test(Connection) ->
exit(did_not_receive_channel_flow) exit(did_not_receive_channel_flow)
end. end.
%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
% This is a test, albeit not a unit test, to see if the producer %% This is a test, albeit not a unit test, to see if the producer
% handles the effect of being throttled. %% handles the effect of being throttled.
channel_flow_sync(Connection) -> channel_flow_sync(Connection) ->
start_channel_flow(Connection, fun lib_amqp:publish/4). start_channel_flow(Connection, fun lib_amqp:publish/4).
@ -324,21 +328,21 @@ cf_consumer_loop(Channel, Tag) ->
ok ok
end. end.
cf_producer_loop(Channel, X, Key, PublishFun, Payload, N) cf_producer_loop(Channel, X, Key, PublishFun, Payload, N)
when N rem 5000 =:= 0 -> when N rem 5000 =:= 0 ->
io:format("Producer (~p) has sent about ~p messages since it started~n", io:format("Producer (~p) has sent about ~p messages since it started~n",
[self(), N]), [self(), N]),
cf_producer_loop(Channel, X, Key, PublishFun, Payload, N + 1); cf_producer_loop(Channel, X, Key, PublishFun, Payload, N + 1);
cf_producer_loop(Channel, X, Key, PublishFun, Payload, N) -> cf_producer_loop(Channel, X, Key, PublishFun, Payload, N) ->
case PublishFun(Channel, X, Key, Payload) of case PublishFun(Channel, X, Key, Payload) of
blocked -> blocked ->
io:format("Producer (~p) is blocked, will go to sleep.....ZZZ~n", io:format("Producer (~p) is blocked, will go to sleep.....ZZZ~n",
[self()]), [self()]),
receive receive
resume -> resume ->
io:format("Producer (~p) has woken up :-)~n", [self()]), io:format("Producer (~p) has woken up :-)~n", [self()]),
cf_producer_loop(Channel, X, Key, cf_producer_loop(Channel, X, Key,
PublishFun, Payload, N + 1) PublishFun, Payload, N + 1)
end; end;
_ -> _ ->
@ -358,9 +362,9 @@ cf_handler_loop(Producer) ->
stop -> ok stop -> ok
end. end.
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
% This tests whether RPC over AMQP produces the same result as invoking the %% This tests whether RPC over AMQP produces the same result as invoking the
% same argument against the same underlying gen_server instance. %% same argument against the same underlying gen_server instance.
rpc_test(Connection) -> rpc_test(Connection) ->
Q = uuid(), Q = uuid(),
Fun = fun(X) -> X + 1 end, Fun = fun(X) -> X + 1 end,
@ -376,7 +380,7 @@ rpc_test(Connection) ->
amqp_rpc_server:stop(Server), amqp_rpc_server:stop(Server),
ok. ok.
%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
setup_publish(Channel) -> setup_publish(Channel) ->
Publish = #publish{routing_key = <<"a.b.c.d">>, Publish = #publish{routing_key = <<"a.b.c.d">>,
@ -409,7 +413,9 @@ setup_exchange(Channel, Q, X, Binding) ->
lib_amqp:bind_queue(Channel, X, Q, Binding), lib_amqp:bind_queue(Channel, X, Q, Binding),
ok. ok.
latch_loop(0) -> ok; latch_loop(0) ->
ok;
latch_loop(Latch) -> latch_loop(Latch) ->
receive receive
finished -> finished ->
@ -420,5 +426,5 @@ latch_loop(Latch) ->
uuid() -> uuid() ->
{A, B, C} = now(), {A, B, C} = now(),
<<A:32,B:32,C:32>>. <<A:32, B:32, C:32>>.