Merged 19344 into 19625

This commit is contained in:
Ben Hood 2008-12-07 14:40:00 +00:00
commit 3b8618d974
18 changed files with 677 additions and 496 deletions

View File

@ -26,25 +26,40 @@
EBIN_DIR=ebin
SOURCE_DIR=src
INCLUDE_DIR=include
ERLC_FLAGS=-W0
DIST_DIR=rabbitmq-erlang-client
LOAD_PATH=ebin rabbitmq_server/ebin
INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl)
SOURCES=$(wildcard $(SOURCE_DIR)/*.erl)
TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES))
ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info
BROKER_DIR=../rabbitmq-server
BROKER_SYMLINK=rabbitmq_server
NODENAME=rabbit_test_direct
MNESIA_DIR=/tmp/rabbitmq_$(NODENAME)_mnesia
LOG_BASE=/tmp
ERL_CALL=erl_call -sname $(NODENAME) -e
all: $(EBIN_DIR) $(TARGETS)
compile:
mkdir -p $(EBIN_DIR)
erlc +debug_info -I $(INCLUDE_DIR) -o $(EBIN_DIR) $(ERLC_FLAGS) $(SOURCE_DIR)/*.erl
$(BROKER_SYMLINK):
ifdef BROKER_DIR
ln -sf $(BROKER_DIR) $(BROKER_SYMLINK)
endif
all: compile
$(EBIN_DIR):
mkdir -p $@
run_node: compile
LOG_BASE=/tmp SKIP_HEART=true SKIP_LOG_ARGS=true MNESIA_DIR=$(MNESIA_DIR) RABBIT_ARGS="-detached -pa ./ebin" NODENAME=$(NODENAME) rabbitmq-server
$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDES) $(BROKER_SYMLINK)
erlc $(ERLC_OPTS) $<
run_server:
NODE_IP_ADDRESS=$(NODE_IP_ADDRESS) NODE_PORT=$(NODE_PORT) NODE_ONLY=true LOG_BASE=$(LOG_BASE) RABBIT_ARGS="$(RABBIT_ARGS) -s rabbit" MNESIA_DIR=$(MNESIA_DIR) $(BROKER_DIR)/scripts/rabbitmq-server
sleep 2 # so that the node is initialized when the tests are run
all_tests: test_network test_network_coverage test_direct test_direct_coverage
@ -53,26 +68,32 @@ all_tests: test_network test_network_coverage test_direct test_direct_coverage
tests_network: test_network test_network_coverage
$(ERL_CALL) -q
test_network: run_node
erl -pa ebin -noshell -eval 'network_client_test:test(),halt().'
test_network: $(TARGETS)
erl -pa $(LOAD_PATH) -noshell -eval 'network_client_test:test(),halt().'
test_network_coverage: run_node
erl -pa ebin -noshell -eval 'network_client_test:test_coverage(),halt().'
test_network_coverage: $(TARGETS)
erl -pa $(LOAD_PATH) -noshell -eval 'network_client_test:test_coverage(),halt().'
tests_direct: test_direct test_direct_coverage
$(ERL_CALL) -q
rm -rf $(MNESIA_DIR)
test_direct:
erl -pa ebin -mnesia dir tmp -boot start_sasl -s rabbit -noshell -eval \
'direct_client_test:test(),halt().'
test_direct: $(TARGETS)
erl -pa $(LOAD_PATH) -noshell -mnesia dir tmp -boot start_sasl -s rabbit -noshell \
-sasl sasl_error_logger '{file, "'${LOG_BASE}'/rabbit-sasl.log"}' \
-kernel error_logger '{file, "'${LOG_BASE}'/rabbit.log"}' \
-eval 'direct_client_test:test(),halt().'
test_direct_coverage:
erl -pa ebin -mnesia dir tmp -boot start_sasl -s rabbit -noshell -eval \
'direct_client_test:test_coverage(),halt().'
test_direct_coverage: $(TARGETS)
erl -pa $(LOAD_PATH) -noshell -mnesia dir tmp -boot start_sasl -s rabbit -noshell \
-sasl sasl_error_logger '{file, "'${LOG_BASE}'/rabbit-sasl.log"}' \
-kernel error_logger '{file, "'${LOG_BASE}'/rabbit.log"}' \
-eval 'direct_client_test:test_coverage(),halt().'
clean:
rm $(EBIN_DIR)/*.beam
rm -f $(EBIN_DIR)/*.beam
rm -f rabbitmq_server erl_crash.dump
rm -fr cover dist
source-tarball:
mkdir -p dist/$(DIST_DIR)

View File

@ -10,7 +10,7 @@ message passing to a RabbitMQ broker.
The API exposed to the user is common to both clients, so each version
can be used interchangeably without having to modify any client code.
The TCP networked client has been tested with RabbitMQ server 1.2.0,
The TCP networked client has been tested with RabbitMQ server 1.4.0,
but should theoretically work with any 0-8 compliant AMQP server.
The direct client is bound to an 0-8 compliant broker using native
@ -35,58 +35,86 @@ Prerequisites
In order to compile/run this code you must have the following
installed:
- Erlang/OTP, R11B-0 or later, http://www.erlang.org/download.html
- The RabbitMQ server, 200710071940 snapshot or later,
http://dev.rabbitmq.com/snapshots/rabbitmq/
Compile and install this in the OTP library directory.
- Eunit, latest version from svn at
http://svn.process-one.net/contribs/trunk/eunit
Compile and install this in the OTP library directory.
- Erlang/OTP, R11B-5 or later, http://www.erlang.org/download.html
- The RabbitMQ server, 93cc2ca0ba62 or later
- Eunit, the Erlang unit testing framework - currently the whole build process
depends on eunit because all of the modules are compiled together.
A future version of the build process could remove this dependency when you
only want to compile the core libraries.
Compile the Erlang client
Getting Eunit
-------------
The test suite uses eunit which is either available bundled with OTP from
release R12B-5 onwards or as a separate download that you will need to build
yourself if you are using an older version of Erlang.
* If you are using R12B-5 or newer:
Just skip to the next section.
* If you are using R12B-4 or older:
Check out eunit from their Subversion repository and build it:
$ svn co http://svn.process-one.net/contribs/trunk/eunit eunit
$ cd eunit
$ make
After this has sucessfully been built, you will need to create a symlink to
the eunit directory in your OTP installation directory:
$ cd $OTP_HOME/lib/erlang/lib
$ ln -sf PATH_TO_EUNIT eunit
where $OTP_HOME is the location of your Erlang/OTP installation.
Compiling the Erlang client
-------------------------
Go to the base directory of the AMQP Erlang client directory and run
'make'.
Running the network client
--------------------------
* If you have "installed" the RabbitMQ server:
You will have a symlink to the rabbitmq-server directory in your OTP
directory, so all you have to do is to run make:
$ make
* If you don't have the RabbitMQ server installed:
You will need to get a copy of the server in order to be able to use it's
header files and runtime libraries. A good place to put this is in the sibling
directory to the Erlang client, which is the default that Makefile expects.
In this case, you can just run make:
$ make
If the source tree for the server is not in the sibling directory, you will
need to specify the path to this directory:
$ make BROKER_DIR=PATH_TO_THE_SERVER
Running the network client tests
--------------------------------
In order to run the network client, you need to run the RabbitMQ
server in a separate Erlang process (or use any other AMQP
server in a separate Erlang process (or use any other compliant AMQP
server). Start your server as usual.
After you have done this, you can run the unit tests:
$ make test_network
$ make test_network
To get more examples of the API, look at the functions in the
test_util module.
Running the direct client
-------------------------
Running the direct client tests
-------------------------------
The direct client has to be run in the same Erlang VM instance as the
RabbitMQ server.
RabbitMQ server. In order to use the makefile to run the direct client tests,
you will need to shutdown any other running instance of RabbitMQ that you may
have on your machine. This is because the Makefile target for running the
direct tests boots its own instance of RabbitMQ. To run these tests, use the
following target.
The included Makefile will do this if the RabbitMQ server has been installed
such that the script rabbitmq-server is in the path (note that /usr/sbin may
not be in your path).
$ make test_direct
After you have checked this, you can run the unit tests:
$ make test_direct
Running the Integration Scenarios
---------------------------------
The network_integration_test module demonstrates how the base API can be
extended to implement an Rpc client over AMQP.
This client does a full Rpc end to end by combining the AMQP transport
with the Hessian data binding protocol.
To run this test end to end, you will need to install the Erlang module
for hessian into to the default OTP library path.
The Hessian library can be downloaded from http://code.google.com/p/cotton/,
version 0.2.2 or later is required.
You will also need to patch the Rabbit java client (using snapshot version rabbitmq-200710291145) with the patch file called primitive_call.patch. The resulting binary from this will need to be put on the classpath of the Java project.

View File

@ -47,23 +47,19 @@
tagged_sub_requests = dict:new(),
closing = false,
return_handler_pid,
flow_control = false,
flow_handler_pid,
consumers = dict:new()}).
-record(rpc_client_state, {broker_config,
-record(rpc_client_state, {channel,
consumer_tag,
reply_queue,
exchange,
routing_key,
continuations = dict:new(),
correlation_id = 0,
type_mapping}).
correlation_id = 0}).
-record(rpc_handler_state, {broker_config,
server_pid,
server_name,
type_mapping
}).
-record(broker_config, {channel_pid,
exchange,
routing_key,
bind_key,
queue}).
-record(rpc_server_state, {channel,
consumer_tag,
handler}).

View File

@ -34,8 +34,10 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
-export([call/2, call/3, cast/2, cast/3]).
-export([subscribe/3]).
-export([register_direct_peer/2]).
-export([register_return_handler/2]).
-export([register_flow_handler/2]).
%% This diagram shows the interaction between the different component processes
%% in an AMQP client scenario.
@ -69,21 +71,28 @@
%% Generic AMQP RPC mechanism that expects a pseudo synchronous response
call(Channel, Method) ->
gen_server:call(Channel, {call, Method}).
%% Generic AMQP send mechanism with content
call(Channel, Method, Content) ->
gen_server:call(Channel, {call, Method, Content}).
%% Allows a consumer to be registered with the channel when invoking a BasicConsume
call(Channel, Method = #'basic.consume'{}, Consumer) ->
%% TODO This requires refactoring, because the handle_call callback
%% can perform the differentiation between tuples
gen_server:call(Channel, {basic_consume, Method, Consumer}).
%% Generic AMQP send mechansim that doesn't expect a response
%% Generic AMQP send mechanism that doesn't expect a response
cast(Channel, Method) ->
gen_server:cast(Channel, {cast, Method}).
%% Generic AMQP send mechansim that doesn't expect a response
%% Generic AMQP send mechanism that doesn't expect a response
cast(Channel, Method, Content) ->
gen_server:cast(Channel, {cast, Method, Content}).
%---------------------------------------------------------------------------
% Consumer registration
%---------------------------------------------------------------------------
%% Registers a consumer pid with the channel
subscribe(Channel, BasicConsume = #'basic.consume'{}, Consumer) ->
gen_server:call(Channel, {BasicConsume, Consumer}).
%---------------------------------------------------------------------------
% Direct peer registration
%---------------------------------------------------------------------------
@ -103,6 +112,10 @@ register_direct_peer(Channel, Peer) ->
register_return_handler(Channel, ReturnHandler) ->
gen_server:cast(Channel, {register_return_handler, ReturnHandler} ).
%% Registers a handler to deal with flow control
register_flow_handler(Channel, FlowHandler) ->
gen_server:cast(Channel, {register_flow_handler, FlowHandler} ).
%---------------------------------------------------------------------------
% Internal plumbing
%---------------------------------------------------------------------------
@ -116,17 +129,17 @@ rpc_top_half(Method, From, State = #channel_state{writer_pid = Writer,
case queue:len(NewRequestQueue) of
1 ->
Do2(Writer,Method);
Other ->
_ ->
ok
end,
{noreply, NewState}.
rpc_bottom_half(#'channel.close'{reply_code = ReplyCode,
reply_text = ReplyText},State) ->
io:format("Channel received close from peer, code: ~p , message: ~p~n",[ReplyCode,ReplyText]),
NewState = channel_cleanup(State),
{stop, normal, NewState};
rpc_bottom_half(Reply, State = #channel_state{writer_pid = Writer,
rpc_requests = RequestQueue,
do2 = Do2}) ->
@ -143,11 +156,8 @@ rpc_bottom_half(Reply, State = #channel_state{writer_pid = Writer,
end,
{noreply, State#channel_state{rpc_requests = NewRequestQueue}}.
subscription_top_half(Method, From, State = #channel_state{writer_pid = Writer, do2 = Do2}) ->
Do2(Writer,Method),
{noreply, State}.
resolve_consumer(ConsumerTag, #channel_state{consumers = []}) ->
resolve_consumer(_ConsumerTag, #channel_state{consumers = []}) ->
exit(no_consumers_registered);
resolve_consumer(ConsumerTag, #channel_state{consumers = Consumers}) ->
@ -169,7 +179,7 @@ channel_cleanup(State = #channel_state{consumers = []}) ->
shutdown_writer(State);
channel_cleanup(State = #channel_state{consumers = Consumers}) ->
Terminator = fun(ConsumerTag, Consumer) -> Consumer ! shutdown end,
Terminator = fun(_ConsumerTag, Consumer) -> Consumer ! shutdown end,
dict:map(Terminator, Consumers),
NewState = State#channel_state{closing = true, consumers = []},
shutdown_writer(NewState).
@ -186,14 +196,14 @@ return_handler(State = #channel_state{return_handler_pid = ReturnHandler}) ->
handle_method(BasicConsumeOk = #'basic.consume_ok'{consumer_tag = ConsumerTag},
State = #channel_state{anon_sub_requests = Anon,
tagged_sub_requests = Tagged}) ->
{From, Consumer, State0} =
{_From, Consumer, State0} =
case dict:find(ConsumerTag,Tagged) of
{ok, {F,C}} ->
NewTagged = dict:erase(ConsumerTag,Tagged),
{F,C,State#channel_state{tagged_sub_requests = NewTagged}};
error ->
case queue:out(Anon) of
{empty,X} ->
{empty,_} ->
exit(anonymous_queue_empty, ConsumerTag);
{{value, {F,C}}, NewAnon} ->
{F,C,State#channel_state{anon_sub_requests = NewAnon}}
@ -213,6 +223,19 @@ handle_method(ChannelCloseOk = #'channel.close_ok'{}, State) ->
{noreply, NewState} = rpc_bottom_half(ChannelCloseOk, State),
{stop, normal, NewState};
%% This handles the flow control flag that the broker initiates.
%% If defined, it informs the flow control handler to suspend submitting
%% any content bearing methods
handle_method(Flow = #'channel.flow'{active = Active},
State = #channel_state{writer_pid = Writer, do2 = Do2,
flow_handler_pid = FlowHandler}) ->
case FlowHandler of
undefined -> ok;
_ -> FlowHandler ! Flow
end,
Do2(Writer, #'channel.flow_ok'{active = Active}),
{noreply, State#channel_state{flow_control = not(Active)}};
handle_method(Method, State) ->
rpc_bottom_half(Method, State).
@ -246,9 +269,18 @@ init([InitialState]) ->
handle_call({call, Method}, From, State = #channel_state{closing = false}) ->
rpc_top_half(Method, From, State);
handle_call({call, _Method, _Content}, _From,
State = #channel_state{flow_control = true}) ->
{reply, blocked, State};
handle_call({call, Method, Content}, _From,
State = #channel_state{writer_pid = Writer, do3 = Do3}) ->
Do3(Writer, Method, Content),
{reply, ok, State};
%% Top half of the basic consume process.
%% Sets up the consumer for registration in the bottom half of this RPC.
handle_call({basic_consume, Method = #'basic.consume'{consumer_tag = Tag}, Consumer},
handle_call({Method = #'basic.consume'{consumer_tag = Tag}, Consumer},
From, State = #channel_state{anon_sub_requests = Subs})
when Tag =:= undefined ; size(Tag) == 0 ->
NewSubs = queue:in({From,Consumer}, Subs),
@ -256,7 +288,7 @@ handle_call({basic_consume, Method = #'basic.consume'{consumer_tag = Tag}, Consu
NewMethod = Method#'basic.consume'{consumer_tag = <<"">>},
rpc_top_half(NewMethod, From, NewState);
handle_call({basic_consume, Method = #'basic.consume'{consumer_tag = Tag}, Consumer},
handle_call({Method = #'basic.consume'{consumer_tag = Tag}, Consumer},
From, State = #channel_state{tagged_sub_requests = Subs})
when is_binary(Tag) ->
% TODO test whether this tag already exists, either in the pending tagged
@ -270,8 +302,17 @@ handle_cast({cast, Method}, State = #channel_state{writer_pid = Writer, do2 = Do
Do2(Writer, Method),
{noreply, State};
%% This discards any message submitted to the channel when flow control is
%% active
handle_cast({cast, Method, _Content},
State = #channel_state{flow_control = true}) ->
% Discard the message and log it
io:format("Discarding content bearing method (~p) ~n", [Method]),
{noreply, State};
%% Standard implementation of the cast/3 command
handle_cast({cast, Method, Content}, State = #channel_state{writer_pid = Writer, do3 = Do3}) ->
handle_cast({cast, Method, Content},
State = #channel_state{writer_pid = Writer, do3 = Do3}) ->
Do3(Writer, Method, Content),
{noreply, State};
@ -287,7 +328,12 @@ handle_cast({register_return_handler, ReturnHandler}, State) ->
NewState = State#channel_state{return_handler_pid = ReturnHandler},
{noreply, NewState};
handle_cast({notify_sent, Peer}, State) ->
%% Registers a handler to process flow control messages
handle_cast({register_flow_handler, FlowHandler}, State) ->
NewState = State#channel_state{flow_handler_pid = FlowHandler},
{noreply, NewState};
handle_cast({notify_sent, _Peer}, State) ->
{noreply, State}.
%---------------------------------------------------------------------------
@ -319,6 +365,16 @@ handle_info(shutdown, State) ->
NewState = channel_cleanup(State),
{stop, normal, NewState};
%% Handle a trapped exit, e.g. from the direct peer
%% In the direct case this is the local channel
%% In the network case this is the process that writes to the socket
%% on a per channel basis
handle_info({'EXIT', _Pid, Reason},
State = #channel_state{number = Number}) ->
io:format("Channel ~p is shutting down due to: ~p~n",[Number, Reason]),
NewState = channel_cleanup(State),
{stop, normal, NewState};
%---------------------------------------------------------------------------
% This is for a race condition between a close.close_ok and a subsequent channel.open
%---------------------------------------------------------------------------
@ -341,10 +397,13 @@ handle_info( {channel_exception, Channel, Reason}, State) ->
%---------------------------------------------------------------------------
% Rest of the gen_server callbacks
%---------------------------------------------------------------------------
terminate(normal, State) -> ok;
terminate(Reason, State) ->
terminate(normal, _State) ->
ok;
terminate(_Reason, State) ->
channel_cleanup(State),
ok.
code_change(_OldVsn, State, _Extra) ->
State.

View File

@ -123,7 +123,7 @@ start_channel(ChannelNumber,CloseFun,Do2,Do3,State = #connection_state{reader_pi
assign_channel_number(none, #connection_state{channels = Channels, channel_max = Max}) ->
allocate_channel_number(dict:fetch_keys(Channels), Max);
assign_channel_number(ChannelNumber, State) ->
assign_channel_number(ChannelNumber, _State) ->
%% TODO bug: check whether this is already taken
ChannelNumber.
@ -141,14 +141,14 @@ register_channel(ChannelNumber, ChannelPid, State = #connection_state{channels =
%% This peforms the reverse mapping so that you can lookup a channel pid
%% Let's hope that this lookup doesn't get too expensive .......
unregister_channel(ChannelPid, State = #connection_state{channels = Channels0}) when is_pid(ChannelPid)->
ReverseMapping = fun(Number, Pid) -> Pid == ChannelPid end,
ReverseMapping = fun(_Number, Pid) -> Pid == ChannelPid end,
Projection = dict:filter(ReverseMapping, Channels0),
%% TODO This differentiation is only necessary for the direct channel,
%% look into preventing the invocation of this method
Channels1 = case dict:fetch_keys(Projection) of
[] ->
Channels0;
[ChannelNumber|T] ->
[ChannelNumber|_] ->
dict:erase(ChannelNumber, Channels0)
end,
State#connection_state{channels = Channels1};
@ -158,9 +158,9 @@ unregister_channel(ChannelNumber, State = #connection_state{channels = Channels0
Channels1 = dict:erase(ChannelNumber, Channels0),
State#connection_state{channels = Channels1}.
allocate_channel_number([], Max)-> 1;
allocate_channel_number([], _Max)-> 1;
allocate_channel_number(Channels, Max) ->
allocate_channel_number(Channels, _Max) ->
MaxChannel = lists:max(Channels),
%% TODO check channel max and reallocate appropriately
MaxChannel + 1.
@ -179,7 +179,7 @@ init([InitialState, Handshake]) ->
{ok, State}.
%% Starts a new network channel.
handle_call({network, ChannelNumber, OutOfBand}, From, State) ->
handle_call({network, ChannelNumber, OutOfBand}, _From, State) ->
handle_start({ChannelNumber, OutOfBand},
fun amqp_network_driver:open_channel/3,
fun amqp_network_driver:close_channel/1,
@ -188,7 +188,7 @@ handle_call({network, ChannelNumber, OutOfBand}, From, State) ->
State);
%% Starts a new direct channel.
handle_call({direct, ChannelNumber, OutOfBand}, From, State) ->
handle_call({direct, ChannelNumber, OutOfBand}, _From, State) ->
handle_start({ChannelNumber, OutOfBand},
fun amqp_direct_driver:open_channel/3,
fun amqp_direct_driver:close_channel/1,
@ -201,7 +201,7 @@ handle_call({Mode, Close = #'connection.close'{}}, From, State) ->
close_connection(Mode, Close, From, State),
{stop,normal,State}.
handle_cast(Message, State) ->
handle_cast(_Message, State) ->
{noreply, State}.
%---------------------------------------------------------------------------
@ -228,14 +228,19 @@ handle_info( {'EXIT', Pid, {amqp,Reason,Msg,Context}}, State) ->
io:format("Just trapping this exit and proceding to trap an exit from the client channel process~n"),
{noreply, State};
true ->
io:format("A hard error has occurred, this forces the connection to end~n"),
{stop,normal,State}
io:format("Hard error: (Code = ~p, Text = ~p)~n", [Code, Text]),
{stop, {hard_error, {Code, Text}}, State}
end;
%% Just the amqp channel shutting down, so unregister this channel
handle_info( {'EXIT', Pid, normal}, State) ->
NewState = unregister_channel(Pid, State),
{noreply, NewState};
% This is a special case for abruptly closed socket connections
handle_info( {'EXIT', _Pid, {socket_error, Reason}}, State) ->
{stop, {socket_error, Reason}, State};
handle_info( {'EXIT', Pid, Reason}, State) ->
io:format("Connection: Handling exit from ~p --> ~p~n",[Pid,Reason]),
NewState = unregister_channel(Pid, State),
@ -245,7 +250,7 @@ handle_info( {'EXIT', Pid, Reason}, State) ->
% Rest of the gen_server callbacks
%---------------------------------------------------------------------------
terminate(Reason, State) -> ok.
terminate(_Reason, _State) -> ok.
code_change(_OldVsn, State, _Extra) ->
State.

View File

@ -30,14 +30,21 @@
-include_lib("rabbitmq_server/include/rabbit_framing.hrl").
-export([init/1, handle_info/2, terminate/2]).
-export([code_change/3, handle_call/2, handle_event/2]).
%---------------------------------------------------------------------------
% gen_event callbacks
%---------------------------------------------------------------------------
init(Args) ->
init(_Args) ->
{ok, []}.
handle_call(_Request, State) ->
{ok, not_understood, State}.
handle_event(_Event, State) ->
{ok, State}.
handle_info(shutdown, State) ->
io:format("---------------------------~n"),
io:format("AMQP Consumer SHUTDOWN~n"),
@ -56,13 +63,16 @@ handle_info(#'basic.cancel_ok'{consumer_tag = ConsumerTag}, State) ->
io:format("---------------------------~n"),
{ok, State};
handle_info({#'basic.deliver'{consumer_tag = ConsumerTag},
{content, ClassId, Properties, PropertiesBin, Payload}},
handle_info({#'basic.deliver'{},
{content, _ClassId, _Properties, _PropertiesBin, Payload}},
State) ->
io:format("---------------------------~n"),
io:format("AMQP Consumer, rec'd: ~p~n", [ Payload ] ),
io:format("---------------------------~n"),
{ok, State}.
terminate(Args, State) ->
terminate(_Args, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -46,17 +46,18 @@ handshake(ConnectionState = #connection_state{username = User,
rabbit_access_control:check_vhost_access(#user{username = UserBin}, VHostPath),
ConnectionState.
open_channel({Channel,OutOfBand}, ChannelPid, State = #connection_state{username = User,
vhostpath = VHost}) ->
open_channel({_Channel, _OutOfBand}, ChannelPid,
State = #connection_state{username = User, vhostpath = VHost}) ->
UserBin = amqp_util:binary(User),
ReaderPid = WriterPid = ChannelPid,
Peer = rabbit_channel:start_link(ReaderPid, WriterPid, UserBin, VHost),
amqp_channel:register_direct_peer(ChannelPid, Peer),
State.
close_channel(WriterPid) -> ok.
close_channel(_WriterPid) -> ok.
close_connection(Close, From, State) -> gen_server:reply(From, #'connection.close_ok'{}).
close_connection(_Close, From, _State) ->
gen_server:reply(From, #'connection.close_ok'{}).
do(Writer, Method) -> rabbit_channel:do(Writer, Method).
do(Writer, Method, Content) -> rabbit_channel:do(Writer, Method, Content).

View File

@ -62,9 +62,9 @@ handshake(ConnectionState = #connection_state{serverhost = Host}) ->
%% because this will be parsed out of the frames received off the socket.
%% Hence, you have tell the singelton reader which Pids are intended to
%% process messages for a particular channel
open_channel({ChannelNumber, OutOfBand}, ChannelPid,
State = #connection_state{reader_pid = ReaderPid,
sock = Sock}) ->
open_channel({ChannelNumber, _OutOfBand}, ChannelPid,
#connection_state{reader_pid = ReaderPid,
sock = Sock}) ->
ReaderPid ! {ChannelPid, ChannelNumber},
WriterPid = start_writer(Sock, ChannelNumber),
amqp_channel:register_direct_peer(ChannelPid, WriterPid ).
@ -107,7 +107,7 @@ send_frame(Channel, Frame) ->
recv() ->
receive
{method, Method, Content} ->
{method, Method, _Content} ->
Method
end.
@ -116,11 +116,7 @@ recv() ->
%---------------------------------------------------------------------------
network_handshake(Writer, State = #connection_state{ vhostpath = VHostPath }) ->
#'connection.start'{version_major = MajorVersion,
version_minor = MinorVersion,
server_properties = Properties,
mechanisms = Mechansims,
locales = Locales } = recv(),
#'connection.start'{} = recv(),
do(Writer, start_ok(State)),
#'connection.tune'{channel_max = ChannelMax,
frame_max = FrameMax,
@ -137,7 +133,7 @@ network_handshake(Writer, State = #connection_state{ vhostpath = VHostPath }) ->
capabilities = <<"">>,
insist = false },
do(Writer, ConnectionOpen),
#'connection.open_ok'{known_hosts = KnownHosts} = recv(),
#'connection.open_ok'{} = recv(),
%% TODO What should I do with the KnownHosts?
State#connection_state{channel_max = ChannelMax, heartbeat = Heartbeat}.
@ -157,7 +153,7 @@ start_ok(#connection_state{username = Username, password = Password}) ->
start_reader(Sock, FramingPid) ->
process_flag(trap_exit, true),
put({channel, 0},{chpid, FramingPid}),
{ok, Ref} = prim_inet:async_recv(Sock, 7, -1),
{ok, _Ref} = prim_inet:async_recv(Sock, 7, -1),
reader_loop(Sock, undefined, undefined, undefined),
gen_tcp:close(Sock).
@ -171,14 +167,15 @@ reader_loop(Sock, Type, Channel, Length) ->
closed_ok ->
ok;
_ ->
{ok, Ref} = prim_inet:async_recv(Sock, 7, -1),
{ok, _Ref} = prim_inet:async_recv(Sock, 7, -1),
reader_loop(Sock, undefined, undefined, undefined)
end;
{inet_async, Sock, _, {ok, <<_Type:8,_Channel:16,PayloadSize:32>>}} ->
{ok, Ref} = prim_inet:async_recv(Sock, PayloadSize + 1, -1),
{ok, _Ref} = prim_inet:async_recv(Sock, PayloadSize + 1, -1),
reader_loop(Sock, _Type, _Channel, PayloadSize);
{inet_async, Sock, Ref, {error, Reason}} ->
io:format("Have a look into this one: ~p~n",[Reason]);
{inet_async, Sock, _Ref, {error, Reason}} ->
io:format("Socket error: ~p~n", [Reason]),
exit({socket_error, Reason});
{heartbeat, Heartbeat} ->
rabbit_heartbeat:start_heartbeat(Sock, Heartbeat),
reader_loop(Sock, Type, Channel, Length);
@ -189,12 +186,13 @@ reader_loop(Sock, Type, Channel, Length) ->
io:format("Reader (~p) received timeout from heartbeat, exiting ~n",[self()]);
close ->
io:format("Reader (~p) received close command, exiting ~n",[self()]);
{'EXIT', Pid, Reason} ->
[H|T] = get_keys({chpid,Pid}),
{'EXIT', Pid, _Reason} ->
[H|_] = get_keys({chpid,Pid}),
erase(H),
reader_loop(Sock, Type, Channel, Length);
Other ->
io:format("Other ~p~n",[Other])
io:format("Unknown message type: ~p~n", [Other]),
exit({unknown_message_type, Other})
end.
start_framing_channel(ChannelPid, ChannelNumber) ->

View File

@ -31,107 +31,110 @@
-behaviour(gen_server).
-export([start/2]).
-export([call/4]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
-export([start/2, stop/1]).
-export([call/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3,
handle_cast/2, handle_info/2]).
%---------------------------------------------------------------------------
% API
%---------------------------------------------------------------------------
start(BrokerConfig, TypeMapping) ->
{ok, RpcClientPid} = gen_server:start(?MODULE, [BrokerConfig, TypeMapping], []),
RpcClientPid.
start(Connection, Queue) ->
Channel = lib_amqp:start_channel(Connection),
{ok, Pid} = gen_server:start(?MODULE, [Channel, Queue], []),
Pid.
call(RpcClientPid, ContentType, Function, Args) ->
gen_server:call(RpcClientPid, {ContentType, [Function|Args]} ).
stop(Pid) ->
gen_server:call(Pid, stop).
call(RpcClientPid, Payload) ->
gen_server:call(RpcClientPid, {call, Payload}).
%---------------------------------------------------------------------------
% Plumbing
%---------------------------------------------------------------------------
% Sets up a reply queue for this client to listen on
setup_reply_queue(State = #rpc_client_state{broker_config = BrokerConfig}) ->
#broker_config{channel_pid = ChannelPid} = BrokerConfig,
QueueDeclare = #'queue.declare'{queue = <<>>,
passive = false, durable = false,
exclusive = false, auto_delete = false,
nowait = false, arguments = []},
#'queue.declare_ok'{queue = Q,
message_count = MessageCount,
consumer_count = ConsumerCount}
= amqp_channel:call(ChannelPid, QueueDeclare),
NewBrokerConfig = BrokerConfig#broker_config{queue = Q},
State#rpc_client_state{broker_config = NewBrokerConfig}.
setup_reply_queue(State = #rpc_client_state{channel = Channel}) ->
Q = lib_amqp:declare_queue(Channel, <<>>),
State#rpc_client_state{reply_queue = Q}.
% Sets up a consumer to handle rpc responses
setup_consumer(State) ->
ConsumerTag = amqp_rpc_util:register_consumer(State, self()),
% Registers this RPC client instance as a consumer to handle rpc responses
setup_consumer(State = #rpc_client_state{channel = Channel,
reply_queue = Q}) ->
ConsumerTag = lib_amqp:subscribe(Channel, Q, self()),
State#rpc_client_state{consumer_tag = ConsumerTag}.
% Publishes to the broker, stores the From address against
% the correlation id and increments the correlationid for
% the next request
publish({ContentType, [Function|Args] }, From,
State = #rpc_client_state{broker_config = BrokerConfig,
publish(Payload, From,
State = #rpc_client_state{channel = Channel,
reply_queue = Q,
exchange = X,
routing_key = RoutingKey,
correlation_id = CorrelationId,
continuations = Continuations,
type_mapping = TypeMapping}) ->
Payload = amqp_rpc_util:encode(call, ContentType, [Function|Args], TypeMapping ),
#broker_config{channel_pid = ChannelPid, queue = Q,
exchange = X, routing_key = RoutingKey} = BrokerConfig,
BasicPublish = #'basic.publish'{exchange = X,
routing_key = RoutingKey,
mandatory = false, immediate = false},
_CorrelationId = integer_to_list(CorrelationId),
Props = #'P_basic'{correlation_id = list_to_binary(_CorrelationId),
reply_to = Q, content_type = ContentType},
Content = #content{class_id = 60, %% TODO HARDCODED VALUE
properties = Props, properties_bin = 'none',
payload_fragments_rev = [Payload]},
amqp_channel:cast(ChannelPid, BasicPublish, Content),
NewContinuations = dict:store(_CorrelationId, From , Continuations),
State#rpc_client_state{correlation_id = CorrelationId + 1, continuations = NewContinuations}.
continuations = Continuations}) ->
Props = #'P_basic'{correlation_id = <<CorrelationId:64>>,
content_type = <<"application/octet-stream">>,
reply_to = Q},
lib_amqp:publish(Channel, X, RoutingKey, Payload, Props),
State#rpc_client_state{correlation_id = CorrelationId + 1,
continuations
= dict:store(CorrelationId, From, Continuations)}.
%---------------------------------------------------------------------------
% gen_server callbacks
%---------------------------------------------------------------------------
% Sets up a reply queue and consumer within an existing channel
init([BrokerConfig, TypeMapping]) ->
InitialState = #rpc_client_state{broker_config = BrokerConfig,
type_mapping = TypeMapping},
init([Channel, RoutingKey]) ->
InitialState = #rpc_client_state{channel = Channel,
exchange = <<>>,
routing_key = RoutingKey},
State = setup_reply_queue(InitialState),
NewState = setup_consumer(State),
{ok, NewState}.
terminate(Reason, State) ->
% Closes the channel this gen_server instance started
terminate(_Reason, #rpc_client_state{channel = Channel}) ->
lib_amqp:close_channel(Channel),
ok.
handle_call( Payload = {ContentType, [Function|Args] }, From, State) ->
% Handle the application initiated stop by unsubscribing from the
% reply queue - let handle_info/2 process the server's response
% in order to actually terminate this gen_server instance
handle_call(stop, _From, State = #rpc_client_state{channel = Channel,
consumer_tag = Tag}) ->
lib_amqp:unsubscribe(Channel, Tag),
{reply, ok, State};
handle_call({call, Payload}, From, State) ->
NewState = publish(Payload, From, State),
{noreply, NewState}.
handle_cast(Msg, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(#'basic.consume_ok'{consumer_tag = ConsumerTag}, State) ->
{noreply, State};
NewState = State#rpc_client_state{consumer_tag = ConsumerTag},
{noreply, NewState};
handle_info(#'basic.cancel_ok'{consumer_tag = ConsumerTag}, State) ->
{noreply, State};
handle_info(#'basic.cancel_ok'{}, State) ->
{stop, normal, State};
handle_info({content, ClassId, Properties, PropertiesBin, Payload},
State = #rpc_client_state{continuations = Continuations,
type_mapping = TypeMapping}) ->
#'P_basic'{correlation_id = CorrelationId,
content_type = ContentType} = rabbit_framing:decode_properties(ClassId, PropertiesBin),
_CorrelationId = binary_to_list(CorrelationId),
From = dict:fetch(_CorrelationId, Continuations),
Reply = amqp_rpc_util:decode(ContentType, Payload, TypeMapping),
gen_server:reply(From, Reply),
NewContinuations = dict:erase(_CorrelationId, Continuations),
{noreply, State#rpc_client_state{continuations = NewContinuations}}.
handle_info({#'basic.deliver'{},
{content, ClassId, _Props, PropertiesBin, [Payload] }},
State = #rpc_client_state{continuations = Conts}) ->
#'P_basic'{correlation_id = CorrelationId}
= rabbit_framing:decode_properties(ClassId, PropertiesBin),
<<Id:64>> = CorrelationId,
From = dict:fetch(Id, Conts),
gen_server:reply(From, Payload),
{noreply, State#rpc_client_state{continuations = dict:erase(Id, Conts) }}.
code_change(_OldVsn, State, _Extra) ->
State.

View File

@ -1,122 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is the RabbitMQ Erlang Client.
%%
%% The Initial Developers of the Original Code are LShift Ltd.,
%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
%%
%% Portions created by LShift Ltd., Cohesive Financial
%% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
%% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
%% Technologies Ltd.;
%%
%% All Rights Reserved.
%%
%% Contributor(s): Ben Hood <0x6e6562@gmail.com>.
%%
-module(amqp_rpc_handler).
-behaviour(gen_server).
-include_lib("rabbitmq_server/include/rabbit.hrl").
-include_lib("rabbitmq_server/include/rabbit_framing.hrl").
-include("amqp_client.hrl").
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
%---------------------------------------------------------------------------
% gen_server callbacks
%---------------------------------------------------------------------------
init([ServerName, TypeMapping, Username, Password,
BC = #broker_config{exchange = X, routing_key = RoutingKey,
queue = Q, bind_key = BindKey}]) ->
Connection = amqp_connection:start(Username, Password),
ChannelPid = test_util:setup_channel(Connection),
ok = test_util:setup_exchange(ChannelPid, Q, X, BindKey),
BrokerConfig = BC#broker_config{channel_pid = ChannelPid},
State = #rpc_handler_state{server_name = ServerName,
type_mapping = TypeMapping,
broker_config = BrokerConfig},
BasicConsume = #'basic.consume'{queue = Q,
consumer_tag = <<"">>,
no_local = false, no_ack = true, exclusive = false, nowait = false},
#'basic.consume_ok'{consumer_tag = ConsumerTag} = amqp_channel:call(ChannelPid, BasicConsume, self()),
init([State]);
init([State = #rpc_handler_state{server_name = ServerName}]) ->
%% TODO Think about registering gen_servers and linking them to this....
%% it's probably a bad idea because then the server is tied to the rpc handler
{ok, Pid} = gen_server:start_link(ServerName, [], []),
{ok, State#rpc_handler_state{server_pid = Pid}}.
handle_info(shutdown, State) ->
terminate(shutdown, State);
handle_info(#'basic.consume_ok'{consumer_tag = ConsumerTag}, State) ->
{noreply, State};
handle_info(#'basic.cancel_ok'{consumer_tag = ConsumerTag}, State) ->
{noreply, State};
handle_info({content, ClassId, Properties, PropertiesBin, Payload},
State = #rpc_handler_state{broker_config = BrokerConfig,
server_pid = ServerPid,
type_mapping = TypeMapping}) ->
#broker_config{channel_pid = ChannelPid, exchange = X} = BrokerConfig,
Props = #'P_basic'{correlation_id = CorrelationId,
reply_to = Q,
content_type = ContentType}
= rabbit_framing:decode_properties(ClassId, PropertiesBin),
io:format("ABOUT 2---------> ~p / ~p ~n",[Payload,TypeMapping]),
T = amqp_rpc_util:decode(ContentType, Payload, TypeMapping),
io:format("---------> ~p~n",[T]),
Response = case amqp_rpc_util:decode(ContentType, Payload, TypeMapping) of
{error, Encoded} ->
Encoded;
[Function,Arguments] ->
%% This doesn't seem to be the right way to do this dispatch
FunctionName = list_to_atom(binary_to_list(Function)),
case gen_server:call(ServerPid, [FunctionName|Arguments]) of
{'EXIT', Reason} ->
amqp_rpc_util:encode(fault, ContentType, Reason);
Reply ->
amqp_rpc_util:encode(reply, ContentType, Reply, TypeMapping)
end
end,
BasicPublish = #'basic.publish'{exchange = <<"">>,
routing_key = Q,
mandatory = false, immediate = false},
ReplyProps = #'P_basic'{correlation_id = CorrelationId,
content_type = ContentType},
Content = #content{class_id = 60, %% TODO HARDCODED VALUE
properties = ReplyProps, properties_bin = 'none',
payload_fragments_rev = [Response]},
amqp_channel:cast(ChannelPid, BasicPublish, Content),
{noreply, State}.
%---------------------------------------------------------------------------
% Rest of the gen_server callbacks
%---------------------------------------------------------------------------
handle_call(Message, From, State) ->
{noreply, State}.
handle_cast(Message, State) ->
{noreply, State}.
terminate(Reason, State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
State.

101
deps/amqp_client/src/amqp_rpc_server.erl vendored Normal file
View File

@ -0,0 +1,101 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is the RabbitMQ Erlang Client.
%%
%% The Initial Developers of the Original Code are LShift Ltd.,
%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
%%
%% Portions created by LShift Ltd., Cohesive Financial
%% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
%% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
%% Technologies Ltd.;
%%
%% All Rights Reserved.
%%
%% Contributor(s): Ben Hood <0x6e6562@gmail.com>.
%%
-module(amqp_rpc_server).
-behaviour(gen_server).
-include_lib("rabbitmq_server/include/rabbit.hrl").
-include_lib("rabbitmq_server/include/rabbit_framing.hrl").
-include("amqp_client.hrl").
-export([init/1, terminate/2, code_change/3, handle_call/3,
handle_cast/2, handle_info/2]).
-export([start/3]).
-export([stop/1]).
%---------------------------------------------------------------------------
% API
%---------------------------------------------------------------------------
start(Connection, Queue, Fun) ->
{ok, Pid} = gen_server:start(?MODULE, [Connection, Queue, Fun], []),
Pid.
stop(Pid) ->
gen_server:call(Pid, stop).
%---------------------------------------------------------------------------
% gen_server callbacks
%---------------------------------------------------------------------------
init([Connection, Queue, Fun]) ->
Channel = lib_amqp:start_channel(Connection),
lib_amqp:declare_queue(Channel, Queue),
Tag = lib_amqp:subscribe(Channel, Queue, self()),
State = #rpc_server_state{channel = Channel,
consumer_tag = Tag,
handler = Fun},
{ok, State}.
handle_info(shutdown, State = #rpc_server_state{channel = Channel,
consumer_tag = Tag}) ->
Reply = lib_amqp:unsubscribe(Channel, Tag),
{noreply, Reply, State};
handle_info(#'basic.consume_ok'{}, State) ->
{noreply, State};
handle_info(#'basic.cancel_ok'{}, State) ->
{stop, normal, State};
handle_info({#'basic.deliver'{},
{content, ClassId, _Props, PropertiesBin, [Payload] }},
State = #rpc_server_state{handler = Fun, channel = Channel}) ->
#'P_basic'{correlation_id = CorrelationId,
reply_to = Q} =
rabbit_framing:decode_properties(ClassId, PropertiesBin),
Response = Fun(Payload),
Properties = #'P_basic'{correlation_id = CorrelationId},
lib_amqp:publish(Channel, <<>>, Q, Response, Properties),
{noreply, State}.
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
%---------------------------------------------------------------------------
% Rest of the gen_server callbacks
%---------------------------------------------------------------------------
handle_cast(_Message, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
State.

View File

@ -1,66 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is the RabbitMQ Erlang Client.
%%
%% The Initial Developers of the Original Code are LShift Ltd.,
%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
%%
%% Portions created by LShift Ltd., Cohesive Financial
%% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
%% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
%% Technologies Ltd.;
%%
%% All Rights Reserved.
%%
%% Contributor(s): Ben Hood <0x6e6562@gmail.com>.
%%
-module(amqp_rpc_util).
-ifndef(Hessian).
-define(Hessian, <<"application/x-hessian">>).
-endif.
-include_lib("rabbitmq_server/include/rabbit_framing.hrl").
-include("amqp_client.hrl").
-export([register_consumer/2]).
-export([encode/3,encode/4,decode/3]).
% Registers a consumer in this channel
register_consumer(RpcClientState = #rpc_client_state{broker_config = BrokerConfig}, Consumer) ->
#broker_config{channel_pid = ChannelPid, queue = Q} = BrokerConfig,
Tag = <<"">>,
BasicConsume = #'basic.consume'{queue = Q,
consumer_tag = Tag,
no_local = false, no_ack = true, exclusive = false, nowait = false},
#'basic.consume_ok'{consumer_tag = ConsumerTag} = amqp_channel:call(ChannelPid, BasicConsume, Consumer),
RpcClientState#rpc_client_state{consumer_tag = ConsumerTag}.
%---------------------------------------------------------------------------
% Encoding and decoding
%---------------------------------------------------------------------------
decode(?Hessian, [H|T], State) ->
hessian:decode(H, State).
encode(fault, ?Hessian, Reason) ->
hessian:encode(fault, internal_rpc_error , Reason , []).
encode(call, ?Hessian, [Function|Args], State) ->
hessian:encode(call, Function, Args, State);
encode(reply, ?Hessian, Payload, State) when is_tuple(Payload) ->
hessian:encode(reply, Payload, State);
encode(reply, ?Hessian, Payload, State) ->
hessian:encode(reply, Payload, State).

View File

@ -29,6 +29,7 @@
-define(RPC_SLEEP, 500).
-export([test_coverage/0]).
-export([test_channel_flow/0]).
-include_lib("eunit/include/eunit.hrl").
@ -47,12 +48,22 @@ lifecycle_test() -> test_util:lifecycle_test(new_connection()).
basic_ack_test() ->test_util:basic_ack_test(new_connection()).
command_serialization_test() -> test_util:command_serialization_test(new_connection()).
%----------------------------------------------------------------------------
% This must be kicked off manually because it can only be run after Rabbit
% has been running for 1 minute
test_channel_flow() ->
test_util:channel_flow_test(new_connection()).
%----------------------------------------------------------------------------
% Negative Tests
non_existent_exchange_test() ->
negative_test_util:non_existent_exchange_test(new_connection()).
queue_unbind_test() ->
test_util:queue_unbind_test(new_connection()).
%----------------------------------------------------------------------------
%% Common Functions

View File

@ -31,19 +31,45 @@ delete_exchange(Channel, X) ->
if_unused = false, nowait = false},
#'exchange.delete_ok'{} = amqp_channel:call(Channel, ExchangeDelete).
%---------------------------------------------------------------------------
% TODO This whole section of optional properties and mandatory flags
% may have to be re-thought
publish(Channel, X, RoutingKey, Payload) ->
publish(Channel, X, RoutingKey, Payload, false).
publish(Channel, X, RoutingKey, Payload, Mandatory) ->
publish(Channel, X, RoutingKey, Payload, Mandatory)
when is_boolean(Mandatory)->
publish(Channel, X, RoutingKey, Payload, Mandatory,
amqp_util:basic_properties());
publish(Channel, X, RoutingKey, Payload, Properties) ->
publish(Channel, X, RoutingKey, Payload, false, Properties).
publish(Channel, X, RoutingKey, Payload, Mandatory, Properties) ->
publish_internal(fun amqp_channel:call/3,
Channel, X, RoutingKey, Payload, Mandatory, Properties).
async_publish(Channel, X, RoutingKey, Payload) ->
async_publish(Channel, X, RoutingKey, Payload, false).
async_publish(Channel, X, RoutingKey, Payload, Mandatory) ->
publish_internal(fun amqp_channel:cast/3, Channel, X, RoutingKey,
Payload, Mandatory, amqp_util:basic_properties()).
publish_internal(Fun, Channel, X, RoutingKey,
Payload, Mandatory, Properties) ->
BasicPublish = #'basic.publish'{exchange = X,
routing_key = RoutingKey,
mandatory = Mandatory, immediate = false},
{ClassId, MethodId} = rabbit_framing:method_id('basic.publish'),
mandatory = Mandatory,
immediate = false},
{ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
Content = #content{class_id = ClassId,
properties = amqp_util:basic_properties(),
properties_bin = none,
payload_fragments_rev = [Payload]},
amqp_channel:cast(Channel, BasicPublish, Content).
properties = Properties,
properties_bin = none,
payload_fragments_rev = [Payload]},
Fun(Channel, BasicPublish, Content).
%---------------------------------------------------------------------------
close_channel(Channel) ->
ChannelClose = #'channel.close'{reply_code = 200, reply_text = <<"Goodbye">>,
@ -51,31 +77,26 @@ close_channel(Channel) ->
#'channel.close_ok'{} = amqp_channel:call(Channel, ChannelClose),
ok.
teardown(Connection, Channel) ->
close_channel(Channel),
close_connection(Connection) ->
ConnectionClose = #'connection.close'{reply_code = 200, reply_text = <<"Goodbye">>,
class_id = 0, method_id = 0},
#'connection.close_ok'{} = amqp_connection:close(Connection, ConnectionClose),
ok.
teardown(Connection, Channel) ->
close_channel(Channel),
close_connection(Connection).
get(Channel, Q) -> get(Channel, Q, true).
get(Channel, Q, NoAck) ->
BasicGet = #'basic.get'{queue = Q, no_ack = NoAck},
{Method, Content} = amqp_channel:call(Channel, BasicGet),
case Method of
'basic.get_empty' ->
'basic.get_empty';
Other ->
#'basic.get_ok'{delivery_tag = DeliveryTag,
redelivered = Redelivered,
exchange = X,
routing_key = RoutingKey,
message_count = MessageCount} = Method,
#content{class_id = ClassId,
properties = Properties,
properties_bin = PropertiesBin,
payload_fragments_rev = PayloadFragments} = Content,
'basic.get_empty' -> 'basic.get_empty';
_ ->
#'basic.get_ok'{delivery_tag = DeliveryTag} = Method,
case NoAck of
true -> Content;
false -> {DeliveryTag, Content}
@ -100,22 +121,24 @@ subscribe(Channel, Q, Consumer, Tag, NoAck) ->
consumer_tag = Tag,
no_local = false, no_ack = NoAck,
exclusive = false, nowait = false},
#'basic.consume_ok'{consumer_tag = ConsumerTag} = amqp_channel:call(Channel,BasicConsume, Consumer),
#'basic.consume_ok'{consumer_tag = ConsumerTag} =
amqp_channel:subscribe(Channel,BasicConsume, Consumer),
ConsumerTag.
unsubscribe(Channel, Tag) ->
BasicCancel = #'basic.cancel'{consumer_tag = Tag, nowait = false},
#'basic.cancel_ok'{consumer_tag = ConsumerTag} = amqp_channel:call(Channel,BasicCancel),
#'basic.cancel_ok'{} = amqp_channel:call(Channel,BasicCancel),
ok.
declare_queue(Channel) ->
declare_queue(Channel, <<>>).
declare_queue(Channel, Q) ->
QueueDeclare = #'queue.declare'{queue = Q,
passive = false, durable = false,
exclusive = false, auto_delete = false,
nowait = false, arguments = []},
#'queue.declare_ok'{queue = Q1,
message_count = MessageCount,
consumer_count = ConsumerCount}
#'queue.declare_ok'{queue = Q1}
= amqp_channel:call(Channel, QueueDeclare),
Q1.
@ -131,3 +154,7 @@ bind_queue(Channel, X, Q, Binding) ->
routing_key = Binding, nowait = false, arguments = []},
#'queue.bind_ok'{} = amqp_channel:call(Channel, QueueBind).
unbind_queue(Channel, X, Q, Binding) ->
Unbind = #'queue.unbind'{queue = Q, exchange = X,
routing_key = Binding, arguments = []},
#'queue.unbind_ok'{} = amqp_channel:call(Channel, Unbind).

View File

@ -43,4 +43,5 @@ non_existent_exchange_test(Connection) ->
end,
?assertNot(is_process_alive(Channel)),
{Pid,_} = Connection,
?assert(is_process_alive(Pid)).
?assert(is_process_alive(Pid)),
lib_amqp:close_connection(Connection).

View File

@ -52,10 +52,16 @@ basic_ack_test() ->
channel_lifecycle_test() ->
test_util:channel_lifecycle_test(new_connection()).
queue_unbind_test() ->
test_util:queue_unbind_test(new_connection()).
command_serialization_test() ->
test_util:command_serialization_test(new_connection()).
rpc_test() ->
test_util:rpc_test(new_connection()).
%----------------------------------------------------------------------------
% Negative Tests

View File

@ -35,7 +35,13 @@
-record(publish,{q, x, routing_key, bind_key, payload,
mandatory = false, immediate = false}).
% The latch constant defines how many processes are spawned in order
% to run certain functionality in parallel. It follows the standard
% countdown latch pattern.
-define(Latch, 100).
% The wait constant defines how long a consumer waits before it
% unsubscribes
-define(Wait, 200).
%%%%
@ -70,8 +76,6 @@ queue_exchange_binding(Channel, X, Parent, Tag) ->
end,
Q = <<"a.b.c",Tag:32>>,
Binding = <<"a.b.c.*">>,
RoutingKey = <<"a.b.c.d">>,
Payload = <<"foobar">>,
Q1 = lib_amqp:declare_queue(Channel, Q),
?assertMatch(Q, Q1),
lib_amqp:bind_queue(Channel, X, Q, Binding),
@ -95,18 +99,40 @@ command_serialization_test(Connection) ->
Q1 = lib_amqp:declare_queue(Channel, Q),
?assertMatch(Q, Q1),
Parent ! finished
end) || Tag <- lists:seq(1,?Latch)],
end) || _ <- lists:seq(1,?Latch)],
latch_loop(?Latch),
lib_amqp:teardown(Connection, Channel).
queue_unbind_test(Connection) ->
X = <<"eggs">>, Q = <<"foobar">>, Key = <<"quay">>,
Payload = <<"foobar">>,
Channel = lib_amqp:start_channel(Connection),
lib_amqp:declare_exchange(Channel, X),
lib_amqp:declare_queue(Channel, Q),
lib_amqp:bind_queue(Channel, X, Q, Key),
lib_amqp:publish(Channel, X, Key, Payload),
get_and_assert_equals(Channel, Q, Payload),
lib_amqp:unbind_queue(Channel, X, Q, Key),
lib_amqp:publish(Channel, X, Key, Payload),
get_and_assert_empty(Channel, Q),
lib_amqp:teardown(Connection, Channel).
get_and_assert_empty(Channel, Q) ->
BasicGetEmpty = lib_amqp:get(Channel, Q, false),
?assertMatch('basic.get_empty', BasicGetEmpty).
get_and_assert_equals(Channel, Q, Payload) ->
Content = lib_amqp:get(Channel, Q),
#content{payload_fragments_rev = PayloadFragments} = Content,
?assertMatch([Payload], PayloadFragments).
basic_get_test(Connection) ->
Channel = lib_amqp:start_channel(Connection),
{ok, Q} = setup_publish(Channel),
% TODO: This could be refactored to use get_and_assert_equals,
% get_and_assert_empty .... would require another bug though :-)
Content = lib_amqp:get(Channel, Q),
#content{class_id = ClassId,
properties = Properties,
properties_bin = PropertiesBin,
payload_fragments_rev = PayloadFragments} = Content,
#content{payload_fragments_rev = PayloadFragments} = Content,
?assertMatch([<<"foobar">>], PayloadFragments),
BasicGetEmpty = lib_amqp:get(Channel, Q, false),
?assertMatch('basic.get_empty', BasicGetEmpty),
@ -125,27 +151,23 @@ basic_return_test(Connection) ->
timer:sleep(200),
receive
{BasicReturn = #'basic.return'{}, Content} ->
#'basic.return'{reply_code = ReplyCode,
reply_text = ReplyText,
exchange = X,
routing_key = RoutingKey} = BasicReturn,
#'basic.return'{reply_text = ReplyText,
exchange = X} = BasicReturn,
?assertMatch(<<"unroutable">>, ReplyText),
#content{class_id = ClassId,
properties = Props,
properties_bin = PropsBin,
payload_fragments_rev = Payload2} = Content,
#content{payload_fragments_rev = Payload2} = Content,
?assertMatch([Payload], Payload2);
WhatsThis ->
%% TODO investigate where this comes from
io:format(">>>Rec'd ~p/~p~n",[WhatsThis])
io:format("Spurious message ~p~n",[WhatsThis])
after 2000 ->
exit(no_return_received)
end.
end,
lib_amqp:teardown(Connection, Channel).
basic_ack_test(Connection) ->
Channel = lib_amqp:start_channel(Connection),
{ok, Q} = setup_publish(Channel),
{DeliveryTag, Content} = lib_amqp:get(Channel, Q, false),
{DeliveryTag, _} = lib_amqp:get(Channel, Q, false),
lib_amqp:ack(Channel, DeliveryTag),
lib_amqp:teardown(Connection, Channel).
@ -178,7 +200,7 @@ basic_recover_test(Connection) ->
end,
lib_amqp:publish(Channel, <<>>, Q, <<"foobar">>),
receive
{#'basic.deliver'{delivery_tag = DeliveryTag}, Content} ->
{#'basic.deliver'{}, _} ->
%% no_ack set to false, but don't send ack
ok
after 2000 ->
@ -187,7 +209,7 @@ basic_recover_test(Connection) ->
BasicRecover = #'basic.recover'{requeue = true},
amqp_channel:cast(Channel,BasicRecover),
receive
{#'basic.deliver'{delivery_tag = DeliveryTag2}, Content2} ->
{#'basic.deliver'{delivery_tag = DeliveryTag2}, _} ->
lib_amqp:ack(Channel, DeliveryTag2)
after 2000 ->
exit(did_not_receive_second_message)
@ -195,10 +217,150 @@ basic_recover_test(Connection) ->
lib_amqp:teardown(Connection, Channel).
% QOS is not yet implemented in RabbitMQ
basic_qos_test(Connection) -> ok.
basic_qos_test(Connection) ->
lib_amqp:close_connection(Connection).
% Reject is not yet implemented in RabbitMQ
basic_reject_test(Connection) -> ok.
basic_reject_test(Connection) ->
lib_amqp:close_connection(Connection).
%----------------------------------------------------------------------------
% Unit test for the direct client
% This just relies on the fact that a fresh Rabbit VM must consume more than
% 0.1 pc of the system memory:
% 0. Wait 1 minute to let memsup do stuff
% 1. Make sure that the high watermark is set high
% 2. Start a process to receive the pause and resume commands from the broker
% 3. Register this as flow control notification handler
% 4. Let the system settle for a little bit
% 5. Set the threshold to the lowest possible value
% 6. When the flow handler receives the pause command, it sets the watermark
% to a high value in order to get the broker to send the resume command
% 7. Allow 10 secs to receive the pause and resume, otherwise timeout and fail
channel_flow_test(Connection) ->
X = <<"amq.direct">>,
K = Payload = <<"x">>,
memsup:set_sysmem_high_watermark(0.99),
timer:sleep(1000),
Channel = lib_amqp:start_channel(Connection),
Parent = self(),
Child = spawn_link(fun() ->
receive
#'channel.flow'{active = false} ->
blocked = lib_amqp:publish(Channel,
X, K, Payload),
memsup:set_sysmem_high_watermark(0.99),
receive
#'channel.flow'{active = true} ->
Parent ! ok
end
end
end),
amqp_channel:register_flow_handler(Channel, Child),
timer:sleep(1000),
memsup:set_sysmem_high_watermark(0.001),
receive
ok -> ok
after 10000 ->
io:format("Are you sure that you have waited 1 minute?~n"),
exit(did_not_receive_channel_flow)
end.
%----------------------------------------------------------------------------
% This is a test, albeit not a unit test, to see if the producer
% handles the effect of being throttled.
channel_flow_sync(Connection) ->
start_channel_flow(Connection, fun lib_amqp:publish/4).
channel_flow_async(Connection) ->
start_channel_flow(Connection, fun lib_amqp:async_publish/4).
start_channel_flow(Connection, PublishFun) ->
crypto:start(),
X = <<"amq.direct">>,
Key = uuid(),
Producer = spawn_link(
fun() ->
Channel = lib_amqp:start_channel(Connection),
Parent = self(),
FlowHandler = spawn_link(fun() -> cf_handler_loop(Parent) end),
amqp_channel:register_flow_handler(Channel, FlowHandler),
cf_producer_loop(Channel, X, Key, PublishFun, 0)
end),
Consumer = spawn_link(
fun() ->
Channel = lib_amqp:start_channel(Connection),
Q = lib_amqp:declare_queue(Channel),
lib_amqp:bind_queue(Channel, X, Q, Key),
Tag = lib_amqp:subscribe(Channel, Q, self()),
cf_consumer_loop(Channel, Tag)
end),
{Producer, Consumer}.
cf_consumer_loop(Channel, Tag) ->
receive
#'basic.consume_ok'{} -> cf_consumer_loop(Channel, Tag);
#'basic.cancel_ok'{} -> ok;
{#'basic.deliver'{delivery_tag = DeliveryTag}, _Content} ->
lib_amqp:ack(Channel, DeliveryTag),
cf_consumer_loop(Channel, Tag);
stop ->
lib_amqp:unsubscribe(Channel, Tag),
ok
end.
cf_producer_loop(Channel, X, Key, PublishFun, N) when N rem 5000 =:= 0 ->
io:format("Producer (~p) has sent about ~p messages since it started~n",
[self(), N]),
cf_producer_loop(Channel, X, Key, PublishFun, N + 1);
cf_producer_loop(Channel, X, Key, PublishFun, N) ->
case PublishFun(Channel, X, Key, crypto:rand_bytes(10000)) of
blocked ->
io:format("Producer (~p) is blocked, will go to sleep.....ZZZ~n",
[self()]),
receive
resume ->
io:format("Producer (~p) has woken up :-)~n", [self()]),
cf_producer_loop(Channel, X, Key, PublishFun, N + 1)
end;
_ ->
cf_producer_loop(Channel, X, Key, PublishFun, N + 1)
end.
cf_handler_loop(Producer) ->
receive
#'channel.flow'{active = false} ->
io:format("Producer throttling ON~n"),
cf_handler_loop(Producer);
#'channel.flow'{active = true} ->
io:format("Producer throttling OFF, waking up producer (~p)~n",
[Producer]),
Producer ! resume,
cf_handler_loop(Producer);
stop -> ok
end.
%---------------------------------------------------------------------------
% This tests whether RPC over AMQP produces the same result as invoking the
% same argument against the same underlying gen_server instance.
rpc_test(Connection) ->
Q = uuid(),
Fun = fun(X) -> X + 1 end,
RPCHandler = fun(X) -> term_to_binary(Fun(binary_to_term(X))) end,
Server = amqp_rpc_server:start(Connection, Q, RPCHandler),
Client = amqp_rpc_client:start(Connection, Q),
Input = 1,
Reply = amqp_rpc_client:call(Client, term_to_binary(Input)),
Expected = Fun(Input),
DecodedReply = binary_to_term(Reply),
?assertMatch(Expected, DecodedReply),
amqp_rpc_client:stop(Client),
amqp_rpc_server:stop(Server),
ok.
%---------------------------------------------------------------------------
setup_publish(Channel) ->
Publish = #publish{routing_key = <<"a.b.c.d">>,
@ -211,14 +373,13 @@ setup_publish(Channel) ->
setup_publish(Channel, #publish{routing_key = RoutingKey,
q = Q, x = X,
bind_key = BindKey, payload = Payload,
mandatory = Mandatory,
immediate = Immediate}) ->
bind_key = BindKey,
payload = Payload}) ->
ok = setup_exchange(Channel, Q, X, BindKey),
lib_amqp:publish(Channel, X, RoutingKey, Payload),
{ok, Q}.
teardown_test(Connection = {ConnectionPid, Mode}) ->
teardown_test(Connection = {ConnectionPid, _Mode}) ->
Channel = lib_amqp:start_channel(Connection),
?assertMatch(true, is_process_alive(Channel)),
?assertMatch(true, is_process_alive(ConnectionPid)),

View File

@ -1,59 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is the RabbitMQ Erlang Client.
%%
%% The Initial Developers of the Original Code are LShift Ltd.,
%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
%%
%% Portions created by LShift Ltd., Cohesive Financial
%% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
%% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
%% Technologies Ltd.;
%%
%% All Rights Reserved.
%%
%% Contributor(s): Ben Hood <0x6e6562@gmail.com>.
%%
-module(transport_agnostic_server).
-export([start/1]).
-behaviour(gen_server).
-export([start/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
%---------------------------------------------------------------------------
% gen_server callbacks
%---------------------------------------------------------------------------
start(Args) ->
{ok, Pid} = gen_server:start(?MODULE, [], []),
Pid.
init(Args) ->
{ok, []}.
terminate(Reason, State) ->
ok.
handle_call(Payload, From, State) ->
{reply, something, State}.
handle_cast(Msg, State) ->
{noreply, State}.
handle_info(Msg, State) ->
{noreply, State}.
code_change(_OldVsn, State, _Extra) ->
State.