Gave consumer registration it's own API

This commit is contained in:
Ben Hood 2008-12-06 23:34:09 +00:00
parent 612ae4248f
commit 7ffdf4cc85
2 changed files with 18 additions and 13 deletions

View File

@ -34,6 +34,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
-export([call/2, call/3, cast/2, cast/3]).
-export([subscribe/3]).
-export([register_direct_peer/2]).
-export([register_return_handler/2]).
-export([register_flow_handler/2]).
@ -70,25 +71,28 @@
%% Generic AMQP RPC mechanism that expects a pseudo synchronous response
call(Channel, Method) ->
gen_server:call(Channel, {call, Method}).
%% Allows a consumer to be registered with the channel when invoking a BasicConsume
call(Channel, Method = #'basic.consume'{}, Consumer) ->
%% TODO This requires refactoring, because the handle_call callback
%% can perform the differentiation between tuples
gen_server:call(Channel, {basic_consume, Method, Consumer});
%% Generic AMQP send mechansim with content
%% Generic AMQP send mechanism with content
call(Channel, Method, Content) ->
gen_server:call(Channel, {call, Method, Content}).
%% Generic AMQP send mechansim that doesn't expect a response
%% Generic AMQP send mechanism that doesn't expect a response
cast(Channel, Method) ->
gen_server:cast(Channel, {cast, Method}).
%% Generic AMQP send mechansim that doesn't expect a response
%% Generic AMQP send mechanism that doesn't expect a response
cast(Channel, Method, Content) ->
gen_server:cast(Channel, {cast, Method, Content}).
%---------------------------------------------------------------------------
% Consumer registration
%---------------------------------------------------------------------------
%% Registers a consumer pid with the channel
subscribe(Channel, BasicConsume = #'basic.consume'{}, Consumer) ->
gen_server:call(Channel, {BasicConsume, Consumer}).
%---------------------------------------------------------------------------
% Direct peer registration
%---------------------------------------------------------------------------
@ -276,7 +280,7 @@ handle_call({call, Method, Content}, _From,
%% Top half of the basic consume process.
%% Sets up the consumer for registration in the bottom half of this RPC.
handle_call({basic_consume, Method = #'basic.consume'{consumer_tag = Tag}, Consumer},
handle_call({Method = #'basic.consume'{consumer_tag = Tag}, Consumer},
From, State = #channel_state{anon_sub_requests = Subs})
when Tag =:= undefined ; size(Tag) == 0 ->
NewSubs = queue:in({From,Consumer}, Subs),
@ -284,7 +288,7 @@ handle_call({basic_consume, Method = #'basic.consume'{consumer_tag = Tag}, Consu
NewMethod = Method#'basic.consume'{consumer_tag = <<"">>},
rpc_top_half(NewMethod, From, NewState);
handle_call({basic_consume, Method = #'basic.consume'{consumer_tag = Tag}, Consumer},
handle_call({Method = #'basic.consume'{consumer_tag = Tag}, Consumer},
From, State = #channel_state{tagged_sub_requests = Subs})
when is_binary(Tag) ->
% TODO test whether this tag already exists, either in the pending tagged

View File

@ -106,7 +106,8 @@ subscribe(Channel, Q, Consumer, Tag, NoAck) ->
consumer_tag = Tag,
no_local = false, no_ack = NoAck,
exclusive = false, nowait = false},
#'basic.consume_ok'{consumer_tag = ConsumerTag} = amqp_channel:call(Channel,BasicConsume, Consumer),
#'basic.consume_ok'{consumer_tag = ConsumerTag} =
amqp_channel:subscribe(Channel,BasicConsume, Consumer),
ConsumerTag.
unsubscribe(Channel, Tag) ->