Client and server queues are now auto_delete and exclusive
This commit is contained in:
parent
778bc6d891
commit
ae742ea9fc
|
@ -57,7 +57,7 @@ call(RpcClientPid, Payload) ->
|
||||||
|
|
||||||
% Sets up a reply queue for this client to listen on
|
% Sets up a reply queue for this client to listen on
|
||||||
setup_reply_queue(State = #rpc_client_state{channel = Channel}) ->
|
setup_reply_queue(State = #rpc_client_state{channel = Channel}) ->
|
||||||
Q = lib_amqp:declare_queue(Channel, <<>>),
|
Q = lib_amqp:declare_private_queue(Channel),
|
||||||
State#rpc_client_state{reply_queue = Q}.
|
State#rpc_client_state{reply_queue = Q}.
|
||||||
|
|
||||||
% Registers this RPC client instance as a consumer to handle rpc responses
|
% Registers this RPC client instance as a consumer to handle rpc responses
|
||||||
|
|
|
@ -54,7 +54,7 @@ stop(Pid) ->
|
||||||
|
|
||||||
init([Connection, Queue, Fun]) ->
|
init([Connection, Queue, Fun]) ->
|
||||||
Channel = lib_amqp:start_channel(Connection),
|
Channel = lib_amqp:start_channel(Connection),
|
||||||
lib_amqp:declare_queue(Channel, Queue),
|
lib_amqp:declare_private_queue(Channel, Queue),
|
||||||
Tag = lib_amqp:subscribe(Channel, Queue, self()),
|
Tag = lib_amqp:subscribe(Channel, Queue, self()),
|
||||||
State = #rpc_server_state{channel = Channel,
|
State = #rpc_server_state{channel = Channel,
|
||||||
consumer_tag = Tag,
|
consumer_tag = Tag,
|
||||||
|
|
|
@ -130,17 +130,40 @@ unsubscribe(Channel, Tag) ->
|
||||||
#'basic.cancel_ok'{} = amqp_channel:call(Channel,BasicCancel),
|
#'basic.cancel_ok'{} = amqp_channel:call(Channel,BasicCancel),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%%---------------------------------------------------------------------------
|
||||||
|
%% Convenience functions for manipulating queues
|
||||||
|
|
||||||
|
%% TODO This whole part of the API needs to be refactored to reflect current
|
||||||
|
%% usage patterns in a sensible way using the defaults that are in the spec
|
||||||
|
%% file
|
||||||
declare_queue(Channel) ->
|
declare_queue(Channel) ->
|
||||||
declare_queue(Channel, <<>>).
|
declare_queue(Channel, <<>>).
|
||||||
|
|
||||||
|
declare_queue(Channel, QueueDeclare = #'queue.declare'{}) ->
|
||||||
|
#'queue.declare_ok'{queue = QueueName}
|
||||||
|
= amqp_channel:call(Channel, QueueDeclare),
|
||||||
|
QueueName;
|
||||||
|
|
||||||
declare_queue(Channel, Q) ->
|
declare_queue(Channel, Q) ->
|
||||||
|
%% TODO Specifying these defaults is unecessary - this is already taken
|
||||||
|
%% care of in the spec file
|
||||||
QueueDeclare = #'queue.declare'{queue = Q,
|
QueueDeclare = #'queue.declare'{queue = Q,
|
||||||
passive = false, durable = false,
|
passive = false, durable = false,
|
||||||
exclusive = false, auto_delete = false,
|
exclusive = false, auto_delete = false,
|
||||||
nowait = false, arguments = []},
|
nowait = false, arguments = []},
|
||||||
#'queue.declare_ok'{queue = Q1}
|
declare_queue(Channel, QueueDeclare).
|
||||||
= amqp_channel:call(Channel, QueueDeclare),
|
|
||||||
Q1.
|
%% Creates a queue that is exclusive and auto-delete
|
||||||
|
declare_private_queue(Channel) ->
|
||||||
|
declare_queue(Channel, #'queue.declare'{exclusive = true,
|
||||||
|
auto_delete = true}).
|
||||||
|
|
||||||
|
declare_private_queue(Channel, QueueName) ->
|
||||||
|
declare_queue(Channel, #'queue.declare'{queue = QueueName,
|
||||||
|
exclusive = true,
|
||||||
|
auto_delete = true}).
|
||||||
|
|
||||||
|
%%---------------------------------------------------------------------------
|
||||||
|
|
||||||
delete_queue(Channel, Q) ->
|
delete_queue(Channel, Q) ->
|
||||||
QueueDelete = #'queue.delete'{queue = Q,
|
QueueDelete = #'queue.delete'{queue = Q,
|
||||||
|
|
Loading…
Reference in New Issue