merging in from default

This commit is contained in:
Vlad Alexandru Ionescu 2010-09-14 12:57:32 +01:00
commit 78c6bc3e90
17 changed files with 247 additions and 261 deletions

View File

@ -24,6 +24,7 @@
#
VERSION=0.0.0
SOURCE_PACKAGE_DIR=$(PACKAGE)-$(VERSION)-src
SOURCE_PACKAGE_TAR_GZ=$(SOURCE_PACKAGE_DIR).tar.gz
@ -139,4 +140,4 @@ source_tarball: $(DIST_DIR)/$(COMMON_PACKAGE_EZ) $(EBIN_DIR)/$(PACKAGE).app | $(
cd $(DIST_DIR) ; tar cvzf $(SOURCE_PACKAGE_TAR_GZ) $(SOURCE_PACKAGE_DIR)
$(DIST_DIR):
mkdir -p $@
mkdir -p $@

1
deps/amqp_client/README vendored Normal file → Executable file
View File

@ -255,3 +255,4 @@ all_tests
network_client_SUITE, direct_client_SUITE and packaging tests. During the
testing, this make target runs an instance of the broker, so make sure
there is no other instance of RabbitMQ server running.

View File

@ -81,8 +81,8 @@ TEST_SOURCES=$(wildcard $(TEST_DIR)/*.erl)
TEST_TARGETS=$(patsubst $(TEST_DIR)/%.erl, $(TEST_DIR)/%.beam, $(TEST_SOURCES))
LIBS_PATH_UNIX=$(DEPS_DIR):$(DIST_DIR)$(ERL_LIBS)
OSTYPE=$(shell uname -o)
ifeq ($(OSTYPE),Cygwin)
IS_CYGWIN=$(shell if [ $(shell expr "$(shell uname -s)" : 'CYGWIN_NT') -gt 0 ]; then echo "true"; else echo "false"; fi)
ifeq ($(IS_CYGWIN),true)
LIBS_PATH=ERL_LIBS="$(shell cygpath -wp $(LIBS_PATH_UNIX))"
else
LIBS_PATH=ERL_LIBS=$(LIBS_PATH_UNIX)

View File

@ -23,12 +23,11 @@
%% Contributor(s): Ben Hood <0x6e6562@gmail.com>.
%%
%% @doc This module encapsulates the client's view of an AMQP channel. Each
%% server side channel is represented by an amqp_channel process on the client
%% side. Channel processes are created using the {@link amqp_connection}
%% module, but channels are respsonsible for closing themselves. Channel
%% processes are linked to the connnection process from which they were
%% created.
%% @doc This module encapsulates the client's view of an AMQP
%% channel. Each server side channel is represented by an amqp_channel
%% process on the client side. Channel processes are created using the
%% {@link amqp_connection} module. Channel processes are supervised
%% under amqp_client's supervision tree.
-module(amqp_channel).
-include("amqp_client.hrl").
@ -51,104 +50,96 @@
-record(state, {number,
sup,
driver,
rpc_requests = queue:new(),
anon_sub_requests = queue:new(),
rpc_requests = queue:new(),
anon_sub_requests = queue:new(),
tagged_sub_requests = dict:new(),
closing = false,
closing = false,
writer,
return_handler_pid = none,
flow_control = false,
flow_handler_pid = none,
consumers = dict:new(),
default_consumer = none,
start_writer_fun}).
%% This diagram shows the interaction between the different component
%% processes in an AMQP client scenario.
%%
%% message* / reply* +-------+
%% +---------------------- | queue |
%% | +-------+
%% |
%% | +-----+
%% v | |
%% request reply* | v
%% +------+ -------+ +--------------+ <------+ +----------------+
%% | User | | | amqp_channel | | | direct_channel |
%% +------+ <------+ +--------------+ -------+ +----------------+
%% response / | request
%% cast/call / |
%% / | message
%% / v
%% +-------------+/ +----------+
%% | Pending RPC | | Consumer |
%% +-------------+ +----------+
%% |
%% [consumer tag --> consumer pid]
%%
%% These notifications are processed asynchronously via
%% handle_info/2 callbacks
return_handler_pid = none,
flow_control = false,
flow_handler_pid = none,
consumers = dict:new(),
default_consumer = none,
start_writer_fun
}).
%%---------------------------------------------------------------------------
%% Type Definitions
%%---------------------------------------------------------------------------
%% @type amqp_command().
%% This abstract datatype represents the set of commands that comprise the
%% AMQP execution model. As indicated in the overview, the attributes of each
%% commands in the execution model are described in the protocol
%% documentation. The Erlang record definitions are autogenerated from a
%% parseable version of the specification.
%% @type amqp_method().
%% This abstract datatype represents the set of methods that comprise
%% the AMQP execution model. As indicated in the overview, the
%% attributes of each method in the execution model are described in
%% the protocol documentation. The Erlang record definitions are
%% autogenerated from a parseable version of the specification. Most
%% fields in the generated records have sensible default values that
%% you need not worry in the case of a simple usage of the client
%% library.
%% @type content() = #'basic.publish'{} |
%% #'basic.deliver'{} |
%% #'basic.return'{}.
%% These are the content bearing AMQP commands.
%% @type amqp_msg() = #amqp_msg{}.
%% This is the content encapsulated in content-bearing AMQP methods. It
%% contains the following fields:
%% <ul>
%% <li>props :: class_property() - A class property record, defaults to
%% #'P_basic'{}</li>
%% <li>payload :: binary() - The arbitrary data payload</li>
%% </ul>
%%---------------------------------------------------------------------------
%% AMQP Channel API methods
%%---------------------------------------------------------------------------
%% @spec (Channel, amqp_command()) -> amqp_command()
%% where
%% Channel = pid()
%% @doc This is a generic RPC mechanism that sends an AMQP command and
%% receives an AMQP command as a response. This function blocks until the
%% response is returned.
%% @spec (Channel, Method) -> Result
%% @doc This is equivalent to amqp_channel:call(Channel, Method, none).
call(Channel, Method) ->
gen_server:call(Channel, {call, Method, none}, infinity).
%% @spec (Channel, amqp_command(), content()) -> ok | blocked | closing
%% @spec (Channel, Method, Content) -> Result
%% where
%% Channel = pid()
%% @doc This sends an AMQP command with content and waits for a synchronous
%% response. Generally this is used with the #basic.publish{} command.
%% This will return a blocked atom if either the server has throttled the
%% client for flow control reasons or if the channel is shutting down due to a
%% broker initiated close.
%% It will return a closing atom if the channel is in the process of shutting
%% down.
%% Note that the synchronicity only means that the client has transmitted the
%% command to the broker. It does not imply that the broker has accepted
%% responsibility for the message. To acheive guaranteed delivery, this
%% function would have to be called within the context of a transaction.
%% Method = amqp_method()
%% Content = amqp_msg() | none
%% Result = amqp_method() | ok | blocked | closing
%% @doc This sends an AMQP method on the channel.
%% For content bearing methods, Content has to be an amqp_msg(), whereas
%% for non-content bearing methods, it needs to be the atom 'none'.<br/>
%% In the case of synchronous methods, this function blocks until the
%% corresponding reply comes back from the server and returns it.
%% In the case of asynchronous methods, the function blocks until the method
%% gets sent on the wire and returns the atom 'ok' on success.<br/>
%% This will return the atom 'blocked' if the server has
%% throttled the client for flow control reasons. This will return the
%% atom 'closing' if the channel is in the process of shutting down.<br/>
%% Note that for asynchronous methods, the synchronicity implied by
%% 'call' only means that the client has transmitted the method to
%% the broker. It does not necessarily imply that the broker has
%% accepted responsibility for the message.
call(Channel, Method, Content) ->
gen_server:call(Channel, {call, Method, Content}, infinity).
%% @spec (Channel, amqp_command()) -> ok
%% @doc Asynchronous variant of {@link call/2}
%% @spec (Channel, Method) -> ok
%% @doc This is equivalent to amqp_channel:cast(Channel, Method, none).
cast(Channel, Method) ->
gen_server:cast(Channel, {cast, Method, none}).
%% @spec (Channel, amqp_command(), content()) -> ok
%% @doc Asynchronous variant of {@link call/3}
%% @spec (Channel, Method, Content) -> ok
%% where
%% Channel = pid()
%% Method = amqp_method()
%% Content = amqp_msg() | none
%% @doc This function is the same as {@link call/3}, except that it returns
%% immediately with the atom 'ok', without blocking the caller process.
%% This function is not recommended with synchronous methods, since there is no
%% way to verify that the server has received the method.
cast(Channel, Method, Content) ->
gen_server:cast(Channel, {cast, Method, Content}).
%% @spec (Channel) -> ok
%% where
%% Channel = pid()
%% @doc Closes the channel, invokes close(Channel, 200, &lt;&lt;"Goodbye">>).
%% @doc Closes the channel, invokes
%% close(Channel, 200, &lt;&lt;"Goodbye"&gt;&gt;).
close(Channel) ->
close(Channel, 200, <<"Goodbye">>).
@ -160,7 +151,7 @@ close(Channel) ->
%% @doc Closes the channel, allowing the caller to supply a reply code and
%% text.
close(Channel, Code, Text) ->
Close = #'channel.close'{reply_text = Text,
Close = #'channel.close'{reply_text = Text,
reply_code = Code,
class_id = 0,
method_id = 0},
@ -174,13 +165,13 @@ close(Channel, Code, Text) ->
%%---------------------------------------------------------------------------
%% @type consume() = #'basic.consume'{}.
%% The AMQP command that is used to subscribe a consumer to a queue.
%% @spec (Channel, consume(), Consumer) -> amqp_command()
%% The AMQP method that is used to subscribe a consumer to a queue.
%% @spec (Channel, consume(), Consumer) -> amqp_method()
%% where
%% Channel = pid()
%% Consumer = pid()
%% @doc Creates a subscription to a queue. This subscribes a consumer pid to
%% the queue defined in the #'basic.consume'{} command record. Note that both
%% the queue defined in the #'basic.consume'{} method record. Note that
%% both the process invoking this method and the supplied consumer process
%% receive an acknowledgement of the subscription. The calling process will
%% receive the acknowledgement as the return value of this function, whereas
@ -192,8 +183,8 @@ subscribe(Channel, BasicConsume = #'basic.consume'{}, Consumer) ->
%% where
%% Channel = pid()
%% ReturnHandler = pid()
%% @doc This registers a handler to deal with returned messages. The
%% registered process will receive #basic.return{} commands.
%% @doc This registers a handler to deal with returned messages. The
%% registered process will receive #basic.return{} records.
register_return_handler(Channel, ReturnHandler) ->
gen_server:cast(Channel, {register_return_handler, ReturnHandler} ).
@ -202,7 +193,7 @@ register_return_handler(Channel, ReturnHandler) ->
%% Channel = pid()
%% FlowHandler = pid()
%% @doc This registers a handler to deal with channel flow notifications.
%% The registered process will receive #channel.flow{} commands.
%% The registered process will receive #channel.flow{} records.
register_flow_handler(Channel, FlowHandler) ->
gen_server:cast(Channel, {register_flow_handler, FlowHandler} ).
@ -214,22 +205,20 @@ register_flow_handler(Channel, FlowHandler) ->
%% Under certain circumstances it is possible for a channel to receive a
%% message delivery which does not match any consumer which is currently
%% set up via basic.consume. This will occur after the following sequence
%% of events:
%%
%% basic.consume with explicit acks
%% %% some deliveries take place but are not acked
%% basic.cancel
%% basic.recover{requeue = false}
%%
%% of events:<br/>
%% <br/>
%% basic.consume with explicit acks<br/>
%% %% some deliveries take place but are not acked<br/>
%% basic.cancel<br/>
%% basic.recover{requeue = false}<br/>
%% <br/>
%% Since requeue is specified to be false in the basic.recover, the spec
%% states that the message must be redelivered to "the original recipient"
%% - i.e. the same channel / consumer-tag. But the consumer is no longer
%% active.
%%
%% active.<br/>
%% In these circumstances, you can register a default consumer to handle
%% such deliveries. If no default consumer is registered then the channel
%% will exit on receiving such a delivery.
%%
%% will exit on receiving such a delivery.<br/>
%% Most people will not need to use this.
register_default_consumer(Channel, Consumer) ->
gen_server:cast(Channel, {register_default_consumer, Consumer}).
@ -238,7 +227,7 @@ register_default_consumer(Channel, Consumer) ->
%% RPC mechanism
%%---------------------------------------------------------------------------
rpc_top_half(Method, Content, From,
rpc_top_half(Method, Content, From,
State0 = #state{rpc_requests = RequestQueue}) ->
State1 = State0#state{
rpc_requests = queue:in({From, Method, Content}, RequestQueue)},
@ -248,21 +237,19 @@ rpc_top_half(Method, Content, From,
end.
rpc_bottom_half(Reply, State = #state{rpc_requests = RequestQueue}) ->
case queue:out(RequestQueue) of
{{value, {From, _Method, _Content}}, NewRequestQueue} ->
case From of none -> ok;
_ -> gen_server:reply(From, Reply)
end,
do_rpc(State#state{rpc_requests = NewRequestQueue});
{empty, _} ->
exit(empty_rpc_bottom_half)
end.
{{value, {From, _Method, _Content}}, RequestQueue1} =
queue:out(RequestQueue),
case From of
none -> ok;
_ -> gen_server:reply(From, Reply)
end,
do_rpc(State#state{rpc_requests = RequestQueue1}).
do_rpc(State0 = #state{rpc_requests = RequestQueue,
closing = Closing}) ->
do_rpc(State = #state{rpc_requests = RequestQueue,
closing = Closing}) ->
case queue:peek(RequestQueue) of
{value, {_From, Method, Content}} ->
State1 = pre_do(Method, Content, State0),
State1 = pre_do(Method, Content, State),
do(Method, Content, State1),
State1;
empty ->
@ -272,7 +259,7 @@ do_rpc(State0 = #state{rpc_requests = RequestQueue,
_ ->
ok
end,
State0
State
end.
pre_do(#'channel.open'{}, _Content, State) ->
@ -390,19 +377,15 @@ handle_method(Method, Content, State = #state{closing = Closing}) ->
handle_regular_method(
#'basic.consume_ok'{consumer_tag = ConsumerTag} = ConsumeOk, none,
#state{tagged_sub_requests = Tagged,
anon_sub_requests = Anon} = State) ->
{_From, Consumer, State0} =
anon_sub_requests = Anon} = State) ->
{Consumer, State0} =
case dict:find(ConsumerTag, Tagged) of
{ok, {F, C}} ->
NewTagged = dict:erase(ConsumerTag,Tagged),
{F, C, State#state{tagged_sub_requests = NewTagged}};
{ok, C} ->
NewTagged = dict:erase(ConsumerTag, Tagged),
{C, State#state{tagged_sub_requests = NewTagged}};
error ->
case queue:out(Anon) of
{empty, _} ->
exit({anonymous_queue_empty, ConsumerTag});
{{value, {F, C}}, NewAnon} ->
{F, C, State#state{anon_sub_requests = NewAnon}}
end
{{value, C}, NewAnon} = queue:out(Anon),
{C, State#state{anon_sub_requests = NewAnon}}
end,
Consumer ! ConsumeOk,
State1 = register_consumer(ConsumerTag, Consumer, State0),
@ -464,9 +447,9 @@ start_link(Driver, ChannelNumber, SWF) ->
%% @private
init([Sup, Driver, ChannelNumber, SWF]) ->
{ok, #state{sup = Sup,
driver = Driver,
number = ChannelNumber,
{ok, #state{sup = Sup,
driver = Driver,
number = ChannelNumber,
start_writer_fun = SWF}}.
%% Standard implementation of the call/{2,3} command
@ -498,16 +481,15 @@ handle_call({subscribe, #'basic.consume'{consumer_tag = Tag} = Method, Consumer}
ok ->
{NewMethod, NewState} =
if Tag =:= undefined orelse size(Tag) == 0 ->
NewAnon = queue:in({From,Consumer}, Anon),
NewAnon = queue:in(Consumer, Anon),
{Method#'basic.consume'{consumer_tag = <<"">>},
State#state{anon_sub_requests = NewAnon}};
is_binary(Tag) ->
%% TODO test whether this tag already exists, either in
%% the pending tagged request map or in general as
%% already subscribed consumer
NewTagged = dict:store(Tag,{From,Consumer}, Tagged),
{Method,
State#state{tagged_sub_requests = NewTagged}}
NewTagged = dict:store(Tag, Consumer, Tagged),
{Method, State#state{tagged_sub_requests = NewTagged}}
end,
{noreply, rpc_top_half(NewMethod, none, From, NewState)};
BlockReply ->

View File

@ -54,22 +54,20 @@ start_link(Type, InfraArgs, ChNumber) ->
start_writer_fun(Sup, direct, [User, VHost, Collector], ChNumber) ->
fun() ->
ChPid = self(),
{ok, RabbitChannel} = supervisor2:start_child(Sup,
{rabbit_channel, {rabbit_channel, start_link,
[ChNumber, ChPid, ChPid, User, VHost,
Collector, start_limiter_fun(Sup)]},
transient, ?MAX_WAIT, worker, [rabbit_channel]}),
RabbitChannel
{ok, _} = supervisor2:start_child(Sup,
{rabbit_channel, {rabbit_channel, start_link,
[ChNumber, ChPid, ChPid, User, VHost,
Collector, start_limiter_fun(Sup)]},
transient, ?MAX_WAIT, worker, [rabbit_channel]}),
end;
start_writer_fun(Sup, network, [Sock], ChNumber) ->
fun() ->
ChPid = self(),
{ok, Writer} = supervisor2:start_child(Sup,
{writer, {rabbit_writer, start_link,
[Sock, ChNumber, ?FRAME_MIN_SIZE,
?PROTOCOL, ChPid]},
transient, ?MAX_WAIT, worker, [rabbit_writer]}),
Writer
{ok, _} = supervisor2:start_child(Sup,
{writer, {rabbit_writer, start_link,
[Sock, ChNumber, ?FRAME_MIN_SIZE, ?PROTOCOL,
ChPid]},
transient, ?MAX_WAIT, worker, [rabbit_writer]}),
end.
start_framing(_Sup, direct, _ChPid) ->
@ -81,12 +79,13 @@ start_framing(Sup, network, ChPid) ->
transient, ?MAX_WAIT, worker, [rabbit_framing_channel]}).
start_limiter_fun(Sup) ->
fun(UnackedCount) ->
Parent = self(),
{ok, _} = supervisor2:start_child(Sup,
{limiter, {rabbit_limiter, start_link,
[Parent, UnackedCount]},
transient, ?MAX_WAIT, worker, [rabbit_limiter]})
fun (UnackedCount) ->
Parent = self(),
{ok, _} = supervisor2:start_child(
Sup,
{limiter, {rabbit_limiter, start_link,
[Parent, UnackedCount]},
transient, ?MAX_WAIT, worker, [rabbit_limiter]})
end.
%%---------------------------------------------------------------------------

View File

@ -25,9 +25,12 @@
%% @doc This module is responsible for maintaining a connection to an AMQP
%% broker and manages channels within the connection. This module is used to
%% open and close connections to the broker as well as creating new channels
%% within a connection. Each amqp_connection process maintains a mapping of
%% the channels that were created by that connection process. Each resulting
%% amqp_channel process is linked to the parent connection process.
%% within a connection.<br/>
%% The connections and channels created by this module are supervised under
%% amqp_client's supervision tree. Please note that connections and channels
%% do not get restarted automatically by the supervision tree in the case of a
%% failure. If you need robust connections and channels, we recommend you use
%% Erlang monitors on the returned connection and channel PID.
-module(amqp_connection).
-include("amqp_client.hrl").
@ -57,10 +60,20 @@
%% defaults to "localhost"</li>
%% <li>port :: integer() - The port the broker is listening on,
%% defaults to 5672</li>
%% <li>channel_max :: non_neg_integer() - The channel_max handshake parameter,
%% defaults to 0</li>
%% <li>frame_max :: non_neg_integer() - The frame_max handshake parameter,
%% defaults to 0</li>
%% <li>heartbeat :: non_neg_integer() - The hearbeat interval in seconds,
%% defaults to 0 (turned off)</li>
%% <li>ssl_options :: term() - The second parameter to be used with the
%% ssl:connect/2 function, defaults to 'none'</li>
%% <li>client_properties :: [{binary(), atom(), binary()}] - A list of extra
%% client properties to be sent to the server, defaults to []</li>
%% </ul>
%%---------------------------------------------------------------------------
%% AMQP Connection API Methods
%% Starting a connection
%%---------------------------------------------------------------------------
%% @spec (Type) -> {ok, Connection} | {error, Error}
@ -86,10 +99,7 @@ start(Type) ->
%% a RabbitMQ server, assuming that the server is running in the same process
%% space.
start(Type, AmqpParams) ->
{ok, Sup} = amqp_connection_sup:start_link(Type, AmqpParams),
%% This unlink will disappear as part of bug 23003
unlink(Sup),
[Connection] = supervisor2:find_child(Sup, connection),
{ok, _Sup, Connection} = amqp_connection_sup:start_link(Type, AmqpParams),
Module = case Type of direct -> amqp_direct_connection;
network -> amqp_network_connection
end,
@ -111,14 +121,18 @@ start(Type, AmqpParams) ->
open_channel(ConnectionPid) ->
open_channel(ConnectionPid, none).
%% @spec (ConnectionPid, ChannelNumber) -> {ok, ChannelPid} | {error, term()}
%% @spec (ConnectionPid, ChannelNumber) -> {ok, ChannelPid} | {error, Error}
%% where
%% ChannelNumber = integer() | 'none'
%% ChannelNumber = pos_integer() | 'none'
%% ConnectionPid = pid()
%% ChannelPid = pid()
%% @doc Opens an AMQP channel.
%% @doc Opens an AMQP channel.<br/>
%% This function assumes that an AMQP connection (networked or direct)
%% has already been successfully established.
%% has already been successfully established.<br/>
%% ChannelNumber must be less than or equal to the negotiated max_channel value,
%% or less than or equal to ?MAX_CHANNEL_NUMBER if the negotiated max_channel
%% value is 0.<br/>
%% In the direct connection, max_channel is always 0.
open_channel(ConnectionPid, ChannelNumber) ->
case command(ConnectionPid, {open_channel, ChannelNumber}) of
{ok, ChannelPid} ->

View File

@ -38,14 +38,15 @@
start_link(Type, AmqpParams) ->
{ok, Sup} = supervisor2:start_link(?MODULE, []),
unlink(Sup),
{ok, ChSupSup} = supervisor2:start_child(Sup,
{channel_sup_sup, {amqp_channel_sup_sup, start_link,
[Type]},
intrinsic, infinity, supervisor,
[amqp_channel_sup_sup]}),
start_connection(Sup, Type, AmqpParams,
start_infrastructure_fun(Sup, Type, ChSupSup)),
{ok, Sup}.
{ok, Connection} = start_connection(Sup, Type, AmqpParams,
start_infrastructure_fun(Sup, Type, ChSupSup)),
{ok, Sup, Connection}.
%%---------------------------------------------------------------------------
%% Internal plumbing
@ -66,29 +67,25 @@ start_infrastructure_fun(Sup, network, ChSupSup) ->
fun(Sock) ->
Connection = self(),
{ok, ChMgr} = start_channels_manager(Sup, Connection, ChSupSup),
{ok, CTSup} = supervisor2:start_child(Sup,
{connection_type_sup, {amqp_connection_type_sup,
start_link_network,
[Sock, Connection, ChMgr]},
transient, infinity, supervisor,
[amqp_connection_type_sup]}),
[MainReader] = supervisor2:find_child(CTSup, main_reader),
[Framing] = supervisor2:find_child(CTSup, framing),
[Writer] = supervisor2:find_child(CTSup, writer),
{ChMgr, MainReader, Framing, Writer,
amqp_connection_type_sup:start_heartbeat_fun(CTSup)}
{ok, CTSup, {MainReader, Framing, Writer}} =
supervisor2:start_child(Sup,
{connection_type_sup, {amqp_connection_type_sup,
start_link_network,
[Sock, Connection, ChMgr]},
transient, infinity, supervisor, [amqp_connection_type_sup]}),
{ok, {ChMgr, MainReader, Framing, Writer,
amqp_connection_type_sup:start_heartbeat_fun(CTSup)}}
end;
start_infrastructure_fun(Sup, direct, ChSupSup) ->
fun() ->
Connection = self(),
{ok, ChMgr} = start_channels_manager(Sup, Connection, ChSupSup),
{ok, CTSup} = supervisor2:start_child(Sup,
{connection_type_sup, {amqp_connection_type_sup,
start_link_direct, []},
transient, infinity, supervisor,
[amqp_connection_type_sup]}),
[Collector] = supervisor2:find_child(CTSup, collector),
{ChMgr, Collector}
{ok, _CTSup, Collector} =
supervisor2:start_child(Sup,
{connection_type_sup, {amqp_connection_type_sup,
start_link_direct, []},
transient, infinity, supervisor, [amqp_connection_type_sup]}),
{ok, {ChMgr, Collector}}
end.
start_channels_manager(Sup, Connection, ChSupSup) ->

View File

@ -38,27 +38,30 @@
start_link_direct() ->
{ok, Sup} = supervisor2:start_link(?MODULE, []),
{ok, _} = supervisor2:start_child(Sup,
{collector, {rabbit_queue_collector, start_link, []},
transient, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
{ok, Sup}.
{ok, Collector} =
supervisor2:start_child(Sup,
{collector, {rabbit_queue_collector, start_link, []},
transient, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
{ok, Sup, Collector}.
start_link_network(Sock, Connection, ChMgr) ->
{ok, Sup} = supervisor2:start_link(?MODULE, []),
{ok, Framing0} = supervisor2:start_child(Sup,
{framing, {rabbit_framing_channel, start_link,
[Connection, Connection, ?PROTOCOL]},
transient, ?MAX_WAIT, worker,
[rabbit_framing_channel]}),
{ok, _} = supervisor2:start_child(Sup,
{writer, {rabbit_writer, start_link,
[Sock, 0, ?FRAME_MIN_SIZE, ?PROTOCOL, Connection]},
transient, ?MAX_WAIT, worker, [rabbit_writer]}),
{ok, _} = supervisor2:start_child(Sup,
{main_reader, {amqp_main_reader, start_link,
[Sock, Connection, ChMgr, Framing0]},
transient, ?MAX_WAIT, worker, [amqp_main_reader]}),
{ok, Sup}.
{ok, Framing} =
supervisor2:start_child(Sup,
{framing, {rabbit_framing_channel, start_link,
[Connection, Connection, ?PROTOCOL]},
transient, ?MAX_WAIT, worker, [rabbit_framing_channel]}),
{ok, Writer} =
supervisor2:start_child(Sup,
{writer, {rabbit_writer, start_link,
[Sock, 0, ?FRAME_MIN_SIZE, ?PROTOCOL, Connection]},
transient, ?MAX_WAIT, worker, [rabbit_writer]}),
{ok, MainReader} =
supervisor2:start_child(Sup,
{main_reader, {amqp_main_reader, start_link,
[Sock, Connection, ChMgr, Framing0]},
transient, ?MAX_WAIT, worker, [amqp_main_reader]}),
{ok, Sup, {MainReader, Framing, Writer}}.
start_heartbeat_fun(Sup) ->
fun(_Sock, 0) ->

View File

@ -44,7 +44,7 @@
-record(closing, {reason,
close = none, %% At least one of close and reply has to be
reply = none, %% none at any given moment
from = none}).
from = none}).
-define(INFO_KEYS,
(amqp_connection:info_keys() ++ [])).
@ -217,9 +217,8 @@ do_connect(State0 = #state{params = #amqp_params{username = User,
rabbit_access_control:check_vhost_access(
#user{username = User, password = Pass}, VHost),
State1 = start_infrastructure(State0),
ServerProperties = rabbit_reader:server_properties(),
State1#state{server_properties = ServerProperties}.
State1#state{server_properties = rabbit_reader:server_properties()}.
start_infrastructure(State = #state{start_infrastructure_fun = SIF}) ->
{ChMgr, Collector} = SIF(),
{ok, {ChMgr, Collector}} = SIF(),
State#state{channels_manager = ChMgr, collector = Collector}.

View File

@ -89,12 +89,9 @@ handle_inet_async({inet_async, Sock, _, Msg},
end,
case Msg of
{ok, <<Payload:Length/binary, ?FRAME_END>>} ->
case handle_frame(Type, Channel, Payload, State) of
closed_ok -> {stop, normal, State};
_ -> {ok, _Ref} =
rabbit_net:async_recv(Sock, 7, infinity),
{noreply, State#state{message = none}}
end;
handle_frame(Type, Channel, Payload, State),
{ok, _Ref} = rabbit_net:async_recv(Sock, 7, infinity),
{noreply, State#state{message = none}};
{ok, <<NewType:8, NewChannel:16, NewLength:32>>} ->
{ok, _Ref} = rabbit_net:async_recv(Sock, NewLength + 1, infinity),
{noreply, State#state{message={NewType, NewChannel, NewLength}}};
@ -109,16 +106,9 @@ handle_frame(Type, Channel, Payload, State) ->
case rabbit_reader:analyze_frame(Type, Payload, ?PROTOCOL) of
heartbeat when Channel /= 0 ->
rabbit_misc:die(frame_error);
trace when Channel /= 0 ->
rabbit_misc:die(frame_error);
%% Match heartbeats and trace frames, but don't do anything with them
%% Match heartbeats but don't do anything with them
heartbeat ->
heartbeat;
trace ->
trace;
{method, Method = 'connection.close_ok', none} ->
pass_frame(Channel, {method, Method}, State),
closed_ok;
AnalyzedFrame ->
pass_frame(Channel, AnalyzedFrame, State)
end.

View File

@ -116,7 +116,9 @@ handle_info({channel_exit, Framing0, Reason},
State = #state{framing0 = Framing0}) ->
{stop, {channel0_died, Reason}, State};
handle_info({channel_exit, 0, Reason},State) ->
{stop, {channel0_died, Reason}, State}.
{stop, {channel0_died, Reason}, State};
handle_info(timeout, State) ->
{stop, heartbeat_timeout, State}.
terminate(_Reason, _State) ->
ok.
@ -267,8 +269,9 @@ internal_error_closing() ->
class_id = 0,
method_id = 0}}.
handle_socket_closed(State = #state{closing =
Closing = #closing{phase = wait_socket_close}}) ->
handle_socket_closed(State = #state{
closing = Closing = #closing{
phase = wait_socket_close}}) ->
{stop, closing_to_reason(Closing), State};
handle_socket_closed(State) ->
{stop, socket_closed_unexpectedly, State}.
@ -315,7 +318,7 @@ handshake(State0 = #state{sock = Sock}) ->
start_infrastructure(State = #state{start_infrastructure_fun = SIF,
sock = Sock}) ->
{ChMgr, MainReader, Framing, Writer, SHF} = SIF(Sock),
{ok, {ChMgr, MainReader, Framing, Writer, SHF}} = SIF(Sock),
State#state{channels_manager = ChMgr,
main_reader = MainReader,
framing0 = Framing,
@ -379,8 +382,8 @@ negotiate_max_value(Client, Server) when Client =:= 0; Server =:= 0 ->
negotiate_max_value(Client, Server) ->
lists:min([Client, Server]).
start_ok(#state{params = #amqp_params{username = Username,
password = Password,
start_ok(#state{params = #amqp_params{username = Username,
password = Password,
client_properties = UserProps}}) ->
LoginTable = [{<<"LOGIN">>, longstr, Username},
{<<"PASSWORD">>, longstr, Password}],

View File

@ -1,19 +1,19 @@
@title AMQP Client for Erlang
@author LShift Ltd. <query@lshift.net>
@copyright 2009 LShift Ltd.
@author Rabbit Technologies Ltd. <support@rabbitmq.com>
@copyright 2010 Rabbit Technologies Ltd.
@version %%VERSION%%
@reference <a href="http://jira.amqp.org/confluence/download/attachments/720900/amqp0-8.pdf?version=1">AMQP Specification</a>, the 0-8 version of the AMQP specification
@reference <a href="http://www.amqp.org/confluence/download/attachments/720900/amqp0-9-1.pdf?version=1">AMQP Specification</a>, the 0-9-1 version of the AMQP specification
@reference <a
href="http://jira.amqp.org/confluence/download/attachments/720900/amqp-xml-doc0-8.pdf?version=1">AMQP Protocol Documentation</a>, the documentation of the commands in the 0-8 version of the protocol
href="http://www.amqp.org/confluence/download/attachments/720900/amqp-xml-doc0-9-1.pdf?version=1">AMQP Protocol Documentation</a>, the documentation of the commands in the 0-9-1 version of the protocol
@doc <p>An implementation of AMQP for Erlang.</p>
@doc
== Overview ==
This application provides an Erlang library to interact with an AMQP compliant message broker. The module documentation assumes that the programmer has some basic familiarity with the execution model defined in the AMQP specification.
This application provides an Erlang library to interact with an AMQP 0-9-1 compliant message broker. The module documentation assumes that the programmer has some basic familiarity with the execution model defined in the AMQP specification.
The main components are {@link amqp_connection} and {@link amqp_channel}. The {@link amqp_connection} module is used to open and close connections to an AMQP broker as well as creating channels. The {@link amqp_channel} module is used to send and receive commands and messages to and from a broker within the context of a channel.
@ -23,10 +23,8 @@ Many of the API functions take structured records as arguments. These records re
== Programming Model ==
For more information, refer to the Erlang AMQP client developer's guide on the RabbitMQ website.
For more information, refer to the Erlang AMQP client <a href="http://www.rabbitmq.com/erlang-client-user-guide.html">developer's guide</a> on the RabbitMQ website.
== RPC Components ==
The {@link amqp_rpc_server} module provides a generic building block to expose
Erlang functions via an RPC over AMQP mechanism. The {@link amqp_rpc_client}
provides a simple client utility to submit RPC requests to a server via AMQP.
The {@link amqp_rpc_server} module provides a generic building block to expose Erlang functions via an RPC over AMQP mechanism. The {@link amqp_rpc_client} provides a simple client utility to submit RPC requests to a server via AMQP.

View File

@ -109,8 +109,10 @@ channel_death_test() ->
%%---------------------------------------------------------------------------
new_connection() ->
{ok, Pid} = amqp_connection:start(direct),
Pid.
case amqp_connection:start(direct) of
{ok, Conn} -> Conn;
{error, _} = Error -> Error
end.
test_coverage() ->
rabbit_misc:enable_cover(),

View File

@ -102,9 +102,8 @@ channel_writer_death_test(Connection) ->
Publish = #'basic.publish'{routing_key = <<>>, exchange = <<>>},
Message = #amqp_msg{props = <<>>, payload = <<>>},
?assertExit(_, amqp_channel:call(Channel, Publish, Message)),
timer:sleep(300),
?assertNot(is_process_alive(Channel)),
?assertNot(is_process_alive(Connection)),
test_util:wait_for_death(Channel),
test_util:wait_for_death(Connection),
ok.
%% An error in the channel process should result in the death of the entire
@ -113,9 +112,8 @@ channel_writer_death_test(Connection) ->
channel_death_test(Connection) ->
{ok, Channel} = amqp_connection:open_channel(Connection),
?assertExit(_, amqp_channel:call(Channel, bogus_message)),
timer:sleep(300),
?assertNot(is_process_alive(Channel)),
?assertNot(is_process_alive(Connection)),
test_util:wait_for_death(Channel),
test_util:wait_for_death(Connection),
ok.
%% Attempting to send a shortstr longer than 255 bytes in a property field
@ -130,9 +128,8 @@ shortstr_overflow_property_test(Connection) ->
PBasic = #'P_basic'{content_type = SentString},
AmqpMsg = #amqp_msg{payload = Payload, props = PBasic},
?assertExit(_, amqp_channel:call(Channel, Publish, AmqpMsg)),
timer:sleep(300),
?assertNot(is_process_alive(Channel)),
?assertNot(is_process_alive(Connection)),
test_util:wait_for_death(Channel),
test_util:wait_for_death(Connection),
ok.
%% Attempting to send a shortstr longer than 255 bytes in a method's field
@ -146,9 +143,8 @@ shortstr_overflow_field_test(Connection) ->
Channel, #'basic.consume'{queue = Q, no_ack = true,
consumer_tag = SentString},
self())),
timer:sleep(300),
?assertNot(is_process_alive(Channel)),
?assertNot(is_process_alive(Connection)),
test_util:wait_for_death(Channel),
test_util:wait_for_death(Connection),
ok.
non_existent_user_test() ->

View File

@ -25,7 +25,7 @@
-module(network_client_SUITE).
-export([test_coverage/0]).
-export([test_coverage/0, new_connection/1]).
-include("amqp_client.hrl").
-include_lib("eunit/include/eunit.hrl").
@ -41,10 +41,10 @@ basic_return_test() ->
basic_qos_test() ->
test_util:basic_qos_test(new_connection()).
basic_recover_test() ->
basic_recover_test() ->
test_util:basic_recover_test(new_connection()).
basic_consume_test() ->
basic_consume_test() ->
test_util:basic_consume_test(new_connection()).
large_content_test() ->
@ -98,7 +98,7 @@ channel_tune_negotiation_test() ->
%%---------------------------------------------------------------------------
%% Negative Tests
non_existent_exchange_test() ->
non_existent_exchange_test() ->
negative_test_util:non_existent_exchange_test(new_connection()).
bogus_rpc_test() ->
@ -130,20 +130,21 @@ shortstr_overflow_property_test() ->
shortstr_overflow_field_test() ->
negative_test_util:shortstr_overflow_field_test(new_connection()).
%%---------------------------------------------------------------------------
%% Common Functions
repeat(Fun, Times) ->
[ Fun(new_connection()) || _ <- lists:seq(1, Times)].
new_connection(Params) ->
{ok, Pid} = amqp_connection:start(network, Params),
Pid.
new_connection() ->
{ok, Pid} = amqp_connection:start(network),
Pid.
new_connection(#amqp_params{}).
new_connection(AmqpParams) ->
case amqp_connection:start(network, AmqpParams) of
{ok, Conn} -> Conn;
{error, _Err} = Error -> Error
end.
test_coverage() ->
rabbit_misc:enable_cover(),

View File

@ -87,10 +87,10 @@ new_connection() ->
{keyfile, CertsDir ++ "/client/key.pem"},
{verify, verify_peer},
{fail_if_no_peer_cert, true}]},
{ok, Pid} = amqp_connection:start_link(Params),
[{supervisor, Sup}] = amqp_connection:info(Pid, [supervisor]),
unlink(Sup),
Pid.
case amqp_connection:start(network, Params) of
{ok, Conn} -> Conn;
{error, _} = Error -> Error
end.
test_coverage() ->
rabbit_misc:enable_cover(),

View File

@ -46,8 +46,8 @@
%%
%% This is an example of how the client interaction should work
%%
%% Connection = amqp_connection:start_network(),
%% Channel = amqp_connection:open_channel(Connection),
%% {ok, Connection} = amqp_connection:start(network),
%% {ok, Channel} = amqp_connection:open_channel(Connection),
%% %%...do something useful
%% amqp_channel:close(Channel),
%% amqp_connection:close(Connection).