updating documentation for the consumer mechanism

This commit is contained in:
Vlad Alexandru Ionescu 2011-03-30 21:23:13 +01:00
parent 077275abf5
commit 964543d122
7 changed files with 173 additions and 35 deletions

View File

@ -14,6 +14,7 @@
%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
%% @private
-module(amqp_auth_mechanisms).
-include("amqp_client.hrl").

View File

@ -184,7 +184,13 @@ register_confirm_handler(Channel, ConfirmHandler) ->
register_flow_handler(Channel, FlowHandler) ->
gen_server:cast(Channel, {register_flow_handler, FlowHandler} ).
%% TODO doc
%% @spec (Channel, Message) -> ok
%% where
%% Channel = pid()
%% Message = any()
%% @doc This causes the channel to invoke Consumer:handle_message/2,
%% where Consumer is the amqp_gen_consumer implementation registered with
%% the channel.
send_to_consumer(Channel, Message) ->
gen_server:cast(Channel, {send_to_consumer, Message}).

View File

@ -105,29 +105,43 @@ start(Type, AmqpParams) ->
%% Commands
%%---------------------------------------------------------------------------
%% @doc Invokes open_channel(ConnectionPid, none).
%% Opens a channel without having to specify a channel number.
%% @doc Invokes open_channel(ConnectionPid, none, ?DEFAULT_CONSUMER).
%% Opens a channel without having to specify a channel number. This uses the
%% default consumer implementation.
open_channel(ConnectionPid) ->
open_channel(ConnectionPid, none, ?DEFAULT_CONSUMER).
open_channel(ConnectionPid, {_ConsumerModule, _ConsumerArgs} = Consumer) ->
%% @doc Invokes open_channel(ConnectionPid, none, Consumer).
%% Opens a channel without having to specify a channel number.
open_channel(ConnectionPid, {_, _} = Consumer) ->
open_channel(ConnectionPid, none, Consumer);
%% @spec (ConnectionPid, ChannelNumber) -> {ok, ChannelPid} | {error, Error}
%% @doc Invokes open_channel(ConnectionPid, ChannelNumber, ?DEFAULT_CONSUMER).
%% Opens a channel, using the default consumer implementation.
open_channel(ConnectionPid, ChannelNumber)
when is_number(ChannelNumber) orelse ChannelNumber =:= none ->
open_channel(ConnectionPid, ChannelNumber, ?DEFAULT_CONSUMER).
%% @spec (ConnectionPid, ChannelNumber, Consumer) -> Result
%% where
%% ChannelNumber = pos_integer() | 'none'
%% ConnectionPid = pid()
%% ChannelNumber = pos_integer() | 'none'
%% Consumer = {ConsumerModule, ConsumerArgs}
%% ConsumerModule = atom()
%% ConsumerArgs = [any()]
%% Result = {ok, ChannelPid} | {error, Error}
%% ChannelPid = pid()
%% @doc Opens an AMQP channel.<br/>
%% Opens a channel, using a proposed channel number and a specific consumer
%% implementation.<br/>
%% ConsumerModule must implement the amqp_gen_consumer behaviour. ConsumerArgs
%% is passed as parameter to ConsumerModule:init/1.<br/>
%% This function assumes that an AMQP connection (networked or direct)
%% has already been successfully established.<br/>
%% ChannelNumber must be less than or equal to the negotiated max_channel value,
%% or less than or equal to ?MAX_CHANNEL_NUMBER if the negotiated max_channel
%% value is 0.<br/>
%% In the direct connection, max_channel is always 0.
open_channel(ConnectionPid, ChannelNumber) ->
open_channel(ConnectionPid, ChannelNumber, ?DEFAULT_CONSUMER).
open_channel(ConnectionPid, ChannelNumber,
{_ConsumerModule, _ConsumerArgs} = Consumer) ->
amqp_gen_connection:open_channel(ConnectionPid, ChannelNumber, Consumer).

View File

@ -14,7 +14,19 @@
%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
%% @doc TODO
%% @doc This module is an implementation of the amqp_gen_consumer behaviour and
%% can be used as part of the Consumer parameter when opening AMQP
%% channels.<br/>
%% The Consumer parameter for this implementation is
%% {{@module}, [ConsumerPid]@}, where ConsumerPid is a process that
%% will receive queue subscription-related messages.<br/>
%% This consumer implementation causes the channel to send to the ConsumerPid
%% all basic.consume_ok, basic.cancel_ok, basic.cancel and basic.deliver
%% messages received from the server.<br/>
%% In addition, if the channel exits abnormally, an exit signal with the
%% channel's exit reason is sent to ConsumerPid.<br/>
%% <br/>
%% This module has no public functions.
-module(amqp_direct_consumer).
-behaviour(amqp_gen_consumer).

View File

@ -14,7 +14,10 @@
%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
%% @doc TODO
%% @doc A behaviour module for implementing consumers for amqp_channel. To
%% specify a consumer implementation for a channel, use
%% amqp_connection:open_channel/{2,3}.<br/>
%% All callbacks are called withing the channel process.
-module(amqp_gen_consumer).
-export([behaviour_info/1]).
@ -23,40 +26,69 @@
%% Behaviour
%%---------------------------------------------------------------------------
%% TODO: doc
%% all callbacks run in the channel process
%% init: called when channel is started.
%% handle_consume_ok: called on each basic.consume_ok
%% handle_cancel_ok: called on each basic.cancel_ok (sent by the server)
%% handle_cancel: called on each basic.cancel (sent by the server)
%% handle_deliver: called on each basic.deliver
%% handle_message: called on amqp_channel:send_to_consumer/2
%% terminate: called after channel has shut down
%% @private
behaviour_info(callbacks) ->
[
%% init(Args) -> {ok, InitialState}
%% @spec Module:init(Args) -> {ok, InitialState}
%% where
%% Args = [any()]
%% InitialState = any()
%% @doc This function is called by the channel, when it starts up.
{init, 1},
%% handle_consume(#'basic.consume_ok'{}, State) -> {ok, NewState}
%% @type consume_ok() = #'basic.consume_ok'{}.
%% The AMQP method returned in response to basic.consume.
%% @spec Module:handle_consume(consume_ok(), State) -> {ok, NewState}
%% where
%% State = NewState = any()
%% @doc This function is called by the channel every time a
%% basic.consume_ok is received from the server.
{handle_consume_ok, 2},
%% handle_cancel_ok(#'basic.cancel_ok'{}, State) -> {ok, NewState}
%% @type cancel_ok() = #'basic.cancel_ok'{}.
%% The AMQP method returned as reply to basicl.cancel.
%% @spec Module:handle_cancel_ok(cancel_ok(), State) -> {ok, NewState}
%% where
%% State = NewState = any()
%% @doc This function is called by the channel every time a basic.cancel_ok
%% is received from the server.
{handle_cancel_ok, 2},
%% handle_cancel(#'basic.cancel'{}, State) -> {ok, NewState}
%% @type cancel() = #'basic.cancel'{}.
%% The AMQP method used to cancel a subscription.
%% @spec Module:handle_cancel(cancel(), State) -> {ok, NewState}
%% where
%% State = NewState = any()
%% @doc This function is called by the channel every time a basic.cancel
%% is received from the server.
{handle_cancel, 2},
%% handle_deliver(#'basic.deliver', #amqp_msg{}, State} -> {ok, NewState}
%% @type deliver() = #'basic.deliver'{}.
%% The AMQP method sent when a message is delivered from a subscribed
%% queue.
%% @spec Module:handle_deliver(deliver(), #amqp_msg{}, State} ->
%% {ok, NewState}
%% where
%% State = NewState = any()
%% @doc This function is called by the channel every time a basic.deliver
%% is received from the server.
{handle_deliver, 3},
%% handle_message(Message, State) -> {ok, NewState}
%% @spec Module:handle_message(Message, State) -> {ok, NewState}
%% where
%% State = NewState = any()
%% @doc This function is called by the channel when calling
%% amqp_channel:send_to_consumer/2.
{handle_message, 2},
%% terminate(Reason, State) -> _
%% @spec Module:terminate(Reason, State) -> _
%% where
%% State = any()
%% Reason = any()
%% @doc This function is called by the channel after it has shut down and
%% just before its process exits.
{terminate, 2}
];
behaviour_info(_Other) ->

View File

@ -14,7 +14,27 @@
%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
%% @doc TODO
%% @doc This module is an implementation of the amqp_gen_consumer behaviour and
%% can be used as part of the Consumer parameter when opening AMQP
%% channels.<br/>
%% The Consumer parameter for this implementation is {{@module}, []@}<br/>
%% This consumer implementation keeps track of consumer tags and sends
%% the subscription-relevant messages to the registered consumers, according
%% to an internal tag dictionary.<br/>
%% Use {@module}:subscribe/3 to subscribe a consumer to a queue and
%% {@module}:cancel/2 to cancel a subscription.<br/>
%% The channel will send to the relevant registered consumers the
%% basic.consume_ok, basic.cancel_ok, basic.cancel and basic.deliver messages
%% received from the server.<br/>
%% If a consumer is not registered for a given consumer tag, the message
%% is sent to the default consumer registered with
%% {@module}:register_default_consumer. If there is no default consumer
%% registered in this case, an exception occurs and the channel is abrubptly
%% terminated.<br/>
%% amqp_channel:call(ChannelPid, #'basic.consume'{}) can also be used to
%% subscribe to a queue, but one must register a default consumer for messages
%% to be delivered to, beforehand. Failing to do so generates the
%% above-mentioned exception.
-module(amqp_selective_consumer).
-include("amqp_client.hrl").
@ -33,7 +53,28 @@
%% Interface
%%---------------------------------------------------------------------------
%% TODO doc
%% @type consume() = #'basic.consume'{}.
%% The AMQP method that is used to subscribe a consumer to a queue.
%% @type consume_ok() = #'basic.consume_ok'{}.
%% The AMQP method returned in response to basic.consume.
%% @spec (ChannelPid, consume(), ConsumerPid) -> Result
%% where
%% ChannelPid = pid()
%% ConsumerPid = pid()
%% Result = consume_ok() | ok | {error, command_invalid}
%% @doc Creates a subscription to a queue. This subscribes a consumer pid to
%% the queue defined in the #'basic.consume'{} method record. Note that
%% both the process invoking this method and the supplied consumer process
%% receive an acknowledgement of the subscription. The calling process will
%% receive the acknowledgement as the return value of this function, whereas
%% the consumer process will receive the notification as a message,
%% asynchronously.<br/>
%% This function returns {error, command_invalid} if consumer_tag is not
%% specified and nowait is true.<br/>
%% Attempting to subscribe with a consumer_tag that is already in use will
%% cause an exception and the channel will terminate. If nowait is set to true
%% in this case, the function will return ok, but the channel will terminate
%% with an error.
subscribe(ChannelPid, #'basic.consume'{nowait = false} = BasicConsume,
ConsumerPid) ->
ConsumeOk = #'basic.consume_ok'{consumer_tag = RetTag} =
@ -49,12 +90,44 @@ subscribe(ChannelPid, #'basic.consume'{nowait = true,
ok = call_consumer(ChannelPid, {subscribe_nowait, Tag, ConsumerPid}),
amqp_channel:call(ChannelPid, BasicConsume).
%% TODO doc
%% (provided for completeness)
%% @type cancel() = #'basic.cancel'{}.
%% The AMQP method used to cancel a subscription.
%% @spec (ChannelPid, Cancel) -> amqp_method() | ok
%% where
%% ChannelPid = pid()
%% Cancel = cancel()
%% @doc This function is the same as calling
%% amqp_channel:call(ChannelPid, Cancel) and is only provided for completeness.
cancel(ChannelPid, #'basic.cancel'{} = Cancel) ->
amqp_channel:call(ChannelPid, Cancel).
%% TODO doc
%% @spec (ChannelPid, ConsumerPid) -> ok
%% where
%% ChannelPid = pid()
%% ConsumerPid = pid()
%% @doc This function registers a default consumer with the channel. A default
%% consumer is used in two situations:<br/>
%% <br/>
%% 1) A subscription was made via
%% amqp_channel:call(ChannelPid, #'basic.consume'{}) (rather than
%% {@module}:subscribe/3) and hence there is no consumer pid registered with the
%% consumer tag.<br/>
%% <br/>
%% 2) The following sequence of events occurs:<br/>
%% <br/>
%% - subscribe is used with basic.consume with explicit acks<br/>
%% - some deliveries take place but are not acked<br/>
%% - a basic.cancel is issued<br/>
%% - a basic.recover{requeue = false} is issued<br/>
%% <br/>
%% Since requeue is specified to be false in the basic.recover, the spec
%% states that the message must be redelivered to "the original recipient"
%% - i.e. the same channel / consumer-tag. But the consumer is no longer
%% active. <br/>
%% <br/>
%% In these two cases, the relevant deliveries will be sent to the default
%% consumer.
register_default_consumer(ChannelPid, ConsumerPid) ->
call_consumer(ChannelPid, {register_default_consumer, ConsumerPid}).
@ -76,6 +149,7 @@ handle_consume_ok(#'basic.consume_ok'{consumer_tag = Tag} = ConsumeOk,
handle_cancel_ok(#'basic.cancel_ok'{consumer_tag = Tag} = CancelOk, State) ->
deliver_or_queue(Tag, CancelOk, State).
%% @private
handle_cancel(#'basic.cancel'{consumer_tag = Tag} = Cancel, State) ->
deliver_or_queue(Tag, Cancel, State).

View File

@ -103,7 +103,7 @@ pub_and_close_test_() ->
end}.
channel_tune_negotiation_test() ->
amqp_connection:close(new_connection(#amqp_params{ channel_max = 10 })).
amqp_connection:close(new_connection(#amqp_params{channel_max = 10})).
confirm_test() ->
test_util:confirm_test(new_connection()).
@ -177,4 +177,3 @@ test_coverage() ->
rabbit_misc:enable_cover(),
test(),
rabbit_misc:report_cover().