remove selective_consumer:cancel/2 and comments mentioning it or subscribe/2

This commit is contained in:
Alexandru Scvortov 2011-06-29 13:28:27 +01:00
parent d457f7c7b6
commit 4e56303921
3 changed files with 16 additions and 43 deletions

View File

@ -170,7 +170,7 @@ init([ConsumerModule, ExtraParams]) ->
{ok, MState} = ConsumerModule:init(ExtraParams),
{ok, #state{module = ConsumerModule, module_state = MState}}.
handle_call({consumer_call, Call}, From,
handle_call({consumer_call, Call}, _From,
State = #state{module = ConsumerModule,
module_state = MState}) ->
case ConsumerModule:handle_call(Call, MState) of

View File

@ -23,8 +23,9 @@
%% the subscription-relevant messages to the registered consumers, according
%% to an internal tag dictionary.<br/>
%% <br/>
%% Use {@module}:subscribe/3 to subscribe a consumer to a queue and
%% {@module}:cancel/2 to cancel a subscription.<br/>
%% Send a #basic.consume{} message to the channel to subscribe a
%% consumer to a queue and send a #basic.cancel{} message to cancel a
%% subscription.<br/>
%% <br/>
%% The channel will send to the relevant registered consumers the
%% basic.consume_ok, basic.cancel_ok, basic.cancel and basic.deliver messages
@ -33,22 +34,8 @@
%% If a consumer is not registered for a given consumer tag, the message
%% is sent to the default consumer registered with
%% {@module}:register_default_consumer. If there is no default consumer
%% registered in this case, an exception occurs and the channel is abrubptly
%% registered in this case, an exception occurs and the channel is abruptly
%% terminated.<br/>
%% <br/>
%% amqp_channel:call(ChannelPid, #'basic.consume'{}) can also be used to
%% subscribe to a queue, but one must register a default consumer for messages
%% to be delivered to, beforehand. Failing to do so generates the
%% above-mentioned exception.<br/>
%% <br/>
%% This consumer implementation creates a link between the channel and the
%% registered consumers (either through register_default_consumer/2 or
%% through subscribe/3). A cancel (either issued by the user application or the
%% server) causes the link to be removed. In addition, registering another
%% default consumer causes the old one to be unjlinked.<br/>
%% Warning! It is not recommended to rely on a consumer on killing off the
%% channel (through the exit signal). That may cause messages to get lost.
%% Always use amqp_channel:close/{1,3} for a clean shut down.
-module(amqp_selective_consumer).
-include("amqp_client.hrl").
@ -56,7 +43,7 @@
-behaviour(amqp_gen_consumer).
-export([cancel/2, register_default_consumer/2]).
-export([register_default_consumer/2]).
-export([init/1, handle_consume_ok/3, handle_consume/3, handle_cancel_ok/3,
handle_cancel/2, handle_deliver/3, handle_down/4, handle_call/3,
terminate/2]).
@ -93,17 +80,6 @@
%% to throw. If nowait is set to true the function will return 'error'
%% immediately, and the channel will be terminated by the server.
%% @type cancel() = #'basic.cancel'{}.
%% The AMQP method used to cancel a subscription.
%% @spec (ChannelPid, Cancel) -> amqp_method() | ok
%% where
%% ChannelPid = pid()
%% Cancel = cancel()
%% @doc This function is the same as calling
%% amqp_channel:call(ChannelPid, Cancel) and is only provided for completeness.
cancel(ChannelPid, #'basic.cancel'{} = Cancel) ->
amqp_channel:call(ChannelPid, Cancel).
%% @spec (ChannelPid, ConsumerPid) -> ok
%% where
%% ChannelPid = pid()
@ -171,7 +147,6 @@ handle_consume(BasicConsume, Pid, State = #state{consumers = Consumers,
end,
case {Ok, BasicConsume} of
{true, #'basic.consume'{nowait = true}} ->
io:format("monitoring ~p~n\n", [Pid]),
{ok, State#state
{consumers = dict:store(Tag, Pid, Consumers),
monitors = dict:store(Pid, monitor(process, Pid), Monitors)}};
@ -193,7 +168,7 @@ handle_consume(BasicConsume, Pid, State = #state{consumers = Consumers,
%% @private
handle_down(_MRef, Pid, _Info, State = #state{monitors = Monitors}) ->
case dict:find(Pid, Monitors) of
{ok, Tag} ->
{ok, _Tag} ->
State#state{monitors = dict:erase(Pid, Monitors)};
error ->
%% unnamed consumer went down before receiving consume_ok

View File

@ -332,8 +332,8 @@ consume_loop(Channel, X, RoutingKey, Parent, Tag) ->
exchange = X,
routing_key = RoutingKey}),
#'basic.consume_ok'{} =
amqp_selective_consumer:subscribe(
Channel, #'basic.consume'{queue = Q, consumer_tag = Tag}, self()),
amqp_channel:call(Channel,
#'basic.consume'{queue = Q, consumer_tag = Tag}),
receive #'basic.consume_ok'{consumer_tag = Tag} -> ok end,
receive {#'basic.deliver'{}, _} -> ok end,
#'basic.cancel_ok'{} =
@ -348,8 +348,7 @@ consume_notification_test(Connection) ->
#'queue.declare_ok'{} =
amqp_channel:call(Channel, #'queue.declare'{queue = Q}),
#'basic.consume_ok'{consumer_tag = CTag} = ConsumeOk =
amqp_selective_consumer:subscribe(
Channel, #'basic.consume'{queue = Q}, self()),
amqp_channel:call(Channel, #'basic.consume'{queue = Q}),
receive ConsumeOk -> ok end,
#'queue.delete_ok'{} =
amqp_channel:call(Channel, #'queue.delete'{queue = Q}),
@ -427,8 +426,8 @@ basic_qos_test(Connection, Prefetch) ->
{ok, Channel} = amqp_connection:open_channel(Connection),
amqp_channel:call(Channel,
#'basic.qos'{prefetch_count = Prefetch}),
amqp_selective_consumer:subscribe(
Channel, #'basic.consume'{queue = Q}, self()),
amqp_channel:call(Channel,
#'basic.consume'{queue = Q}),
Parent ! finished,
sleeping_consumer(Channel, Sleep, Parent)
end) || Sleep <- Workers],
@ -496,11 +495,10 @@ confirm_test(Connection) ->
subscribe_nowait_test(Connection) ->
{ok, Channel} = amqp_connection:open_channel(Connection),
{ok, Q} = setup_publish(Channel),
ok = amqp_selective_consumer:subscribe(
Channel, #'basic.consume'{queue = Q,
consumer_tag = uuid(),
nowait = true},
self()),
ok = amqp_channel:call(Channel,
#'basic.consume'{queue = Q,
consumer_tag = uuid(),
nowait = true}),
receive #'basic.consume_ok'{} -> exit(unexpected_consume_ok)
after 0 -> ok
end,