From 446251e4f83e2e2c20802ba93d2a8f69691f4fad Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Thu, 8 Jan 2009 12:51:02 +0000 Subject: [PATCH] Fixed punctuation --- deps/amqp_client/src/amqp_channel.erl | 161 ++++++++++--------- deps/amqp_client/src/amqp_connection.erl | 101 +++++++----- deps/amqp_client/src/amqp_direct_driver.erl | 22 ++- deps/amqp_client/src/amqp_network_driver.erl | 68 ++++---- deps/amqp_client/src/amqp_rpc_client.erl | 2 +- deps/amqp_client/src/amqp_util.erl | 4 +- deps/amqp_client/src/direct_client_test.erl | 41 +++-- deps/amqp_client/src/lib_amqp.erl | 71 +++++--- deps/amqp_client/src/negative_test_util.erl | 4 +- deps/amqp_client/src/network_client_test.erl | 28 ++-- deps/amqp_client/src/test_util.erl | 120 +++++++------- 11 files changed, 361 insertions(+), 261 deletions(-) diff --git a/deps/amqp_client/src/amqp_channel.erl b/deps/amqp_client/src/amqp_channel.erl index bdc10c70f0..9cf8d1730e 100644 --- a/deps/amqp_client/src/amqp_channel.erl +++ b/deps/amqp_client/src/amqp_channel.erl @@ -32,15 +32,16 @@ -behaviour(gen_server). --export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, + handle_info/2]). -export([call/2, call/3, cast/2, cast/3]). -export([subscribe/3]). -export([register_direct_peer/2]). -export([register_return_handler/2]). -export([register_flow_handler/2]). -%% This diagram shows the interaction between the different component processes -%% in an AMQP client scenario. +%% This diagram shows the interaction between the different component +%% processes in an AMQP client scenario. %% %% message* / reply* +-------+ %% +---------------------- | queue | @@ -62,16 +63,17 @@ %% | %% [consumer tag --> consumer pid] %% -%% * These notifications are processed asynchronously via handle_info/2 callbacks +%% These notifications are processed asynchronously via +%% handle_info/2 callbacks -%--------------------------------------------------------------------------- -% AMQP Channel API methods -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- +%% AMQP Channel API methods +%%--------------------------------------------------------------------------- %% Generic AMQP RPC mechanism that expects a pseudo synchronous response call(Channel, Method) -> gen_server:call(Channel, {call, Method}). - + %% Generic AMQP send mechanism with content call(Channel, Method, Content) -> gen_server:call(Channel, {call, Method, Content}). @@ -84,18 +86,18 @@ cast(Channel, Method) -> cast(Channel, Method, Content) -> gen_server:cast(Channel, {cast, Method, Content}). -%--------------------------------------------------------------------------- -% Consumer registration -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- +%% Consumer registration +%%--------------------------------------------------------------------------- %% Registers a consumer pid with the channel subscribe(Channel, BasicConsume = #'basic.consume'{}, Consumer) -> gen_server:call(Channel, {BasicConsume, Consumer}). -%--------------------------------------------------------------------------- -% Direct peer registration -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- +%% Direct peer registration +%%--------------------------------------------------------------------------- %% Registers the direct channel peer with the state of this channel. %% This registration occurs after the amqp_channel gen_server instance @@ -116,15 +118,15 @@ register_return_handler(Channel, ReturnHandler) -> register_flow_handler(Channel, FlowHandler) -> gen_server:cast(Channel, {register_flow_handler, FlowHandler} ). -%--------------------------------------------------------------------------- -% Internal plumbing -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- +%% Internal plumbing +%%--------------------------------------------------------------------------- rpc_top_half(Method, From, State = #channel_state{writer_pid = Writer, rpc_requests = RequestQueue, do2 = Do2}) -> % Enqueue the incoming RPC request to serialize RPC dispatching - NewRequestQueue = queue:in({From,Method}, RequestQueue), + NewRequestQueue = queue:in({From, Method}, RequestQueue), NewState = State#channel_state{rpc_requests = NewRequestQueue}, case queue:len(NewRequestQueue) of 1 -> @@ -134,9 +136,10 @@ rpc_top_half(Method, From, State = #channel_state{writer_pid = Writer, end, {noreply, NewState}. -rpc_bottom_half(#'channel.close'{reply_code = ReplyCode, - reply_text = ReplyText},State) -> - io:format("Channel received close from peer, code: ~p , message: ~p~n",[ReplyCode,ReplyText]), +rpc_bottom_half(#'channel.close'{reply_code = ReplyCode, + reply_text = ReplyText}, State) -> + io:format("Channel received close from peer, code: ~p , message: ~p~n", + [ReplyCode,ReplyText]), NewState = channel_cleanup(State), {stop, normal, NewState}; @@ -145,9 +148,9 @@ rpc_bottom_half(Reply, State = #channel_state{writer_pid = Writer, do2 = Do2}) -> NewRequestQueue = case queue:out(RequestQueue) of - {empty, {[],[]}} -> exit(empty_rpc_bottom_half); - {{value, {From, _}}, Q} -> gen_server:reply(From, Reply), - Q + {empty, {[], []}} -> exit(empty_rpc_bottom_half); + {{value, {From, _}}, Q} -> gen_server:reply(From, Reply), + Q end, case queue:is_empty(NewRequestQueue) of true -> ok; @@ -163,15 +166,18 @@ resolve_consumer(_ConsumerTag, #channel_state{consumers = []}) -> resolve_consumer(ConsumerTag, #channel_state{consumers = Consumers}) -> dict:fetch(ConsumerTag, Consumers). -register_consumer(ConsumerTag, Consumer, State = #channel_state{consumers = Consumers0}) -> +register_consumer(ConsumerTag, Consumer, + State = #channel_state{consumers = Consumers0}) -> Consumers1 = dict:store(ConsumerTag, Consumer, Consumers0), State#channel_state{consumers = Consumers1}. -unregister_consumer(ConsumerTag, State = #channel_state{consumers = Consumers0}) -> +unregister_consumer(ConsumerTag, + State = #channel_state{consumers = Consumers0}) -> Consumers1 = dict:erase(ConsumerTag, Consumers0), State#channel_state{consumers = Consumers1}. -shutdown_writer(State = #channel_state{close_fun = CloseFun, writer_pid = WriterPid}) -> +shutdown_writer(State = #channel_state{close_fun = CloseFun, + writer_pid = WriterPid}) -> CloseFun(WriterPid), State. @@ -193,41 +199,44 @@ return_handler(State = #channel_state{return_handler_pid = undefined}) -> return_handler(State = #channel_state{return_handler_pid = ReturnHandler}) -> {ReturnHandler, State}. -handle_method(BasicConsumeOk = #'basic.consume_ok'{consumer_tag = ConsumerTag}, - State = #channel_state{anon_sub_requests = Anon, - tagged_sub_requests = Tagged}) -> +handle_method(ConsumeOk = #'basic.consume_ok'{consumer_tag = ConsumerTag}, + State = #channel_state{anon_sub_requests = Anon, + tagged_sub_requests = Tagged}) -> {_From, Consumer, State0} = - case dict:find(ConsumerTag,Tagged) of + case dict:find(ConsumerTag, Tagged) of {ok, {F,C}} -> NewTagged = dict:erase(ConsumerTag,Tagged), {F,C,State#channel_state{tagged_sub_requests = NewTagged}}; error -> case queue:out(Anon) of - {empty,_} -> + {empty, _} -> exit(anonymous_queue_empty, ConsumerTag); - {{value, {F,C}}, NewAnon} -> - {F,C,State#channel_state{anon_sub_requests = NewAnon}} + {{value, {F, C}}, NewAnon} -> + {F, C, + State#channel_state{anon_sub_requests = NewAnon}} end end, - Consumer ! BasicConsumeOk, + Consumer ! ConsumeOk, State1 = register_consumer(ConsumerTag, Consumer, State0), - rpc_bottom_half(BasicConsumeOk,State1); + rpc_bottom_half(ConsumeOk, State1); -handle_method(BasicCancelOk = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, State) -> +handle_method(CancelOk = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, + State) -> Consumer = resolve_consumer(ConsumerTag, State), - Consumer ! BasicCancelOk, + Consumer ! CancelOk, NewState = unregister_consumer(ConsumerTag, State), - rpc_bottom_half(BasicCancelOk, NewState); + rpc_bottom_half(CancelOk, NewState); -handle_method(ChannelCloseOk = #'channel.close_ok'{}, State) -> - {noreply, NewState} = rpc_bottom_half(ChannelCloseOk, State), +handle_method(CloseOk = #'channel.close_ok'{}, State) -> + {noreply, NewState} = rpc_bottom_half(CloseOk, State), {stop, normal, NewState}; %% This handles the flow control flag that the broker initiates. %% If defined, it informs the flow control handler to suspend submitting %% any content bearing methods handle_method(Flow = #'channel.flow'{active = Active}, - State = #channel_state{writer_pid = Writer, do2 = Do2, + State = #channel_state{writer_pid = Writer, + do2 = Do2, flow_handler_pid = FlowHandler}) -> case FlowHandler of undefined -> ok; @@ -239,9 +248,10 @@ handle_method(Flow = #'channel.flow'{active = Active}, handle_method(Method, State) -> rpc_bottom_half(Method, State). -handle_method(BasicDeliver = #'basic.deliver'{consumer_tag = ConsumerTag}, Content, State) -> +handle_method(Deliver = #'basic.deliver'{consumer_tag = ConsumerTag}, + Content, State) -> Consumer = resolve_consumer(ConsumerTag, State), - Consumer ! {BasicDeliver, Content}, + Consumer ! {Deliver, Content}, {noreply, State}; %% Why is the consumer a handle_method/3 call with the network driver, @@ -257,9 +267,9 @@ handle_method(BasicReturn = #'basic.return'{}, Content, State) -> handle_method(Method, Content, State) -> rpc_bottom_half( {Method, Content} , State). -%--------------------------------------------------------------------------- -% gen_server callbacks -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- +%% gen_server callbacks +%%--------------------------------------------------------------------------- init([InitialState]) -> {ok, InitialState}. @@ -298,7 +308,8 @@ handle_call({Method = #'basic.consume'{consumer_tag = Tag}, Consumer}, rpc_top_half(Method, From, NewState). %% Standard implementation of the cast/2 command -handle_cast({cast, Method}, State = #channel_state{writer_pid = Writer, do2 = Do2}) -> +handle_cast({cast, Method}, State = #channel_state{writer_pid = Writer, + do2 = Do2}) -> Do2(Writer, Method), {noreply, State}; @@ -319,7 +330,7 @@ handle_cast({cast, Method, Content}, %% Registers the direct channel peer when using the direct client handle_cast({register_direct_peer, Peer}, State) -> link(Peer), - process_flag(trap_exit,true), + process_flag(trap_exit, true), NewState = State#channel_state{writer_pid = Peer}, {noreply, NewState}; @@ -336,23 +347,29 @@ handle_cast({register_flow_handler, FlowHandler}, State) -> handle_cast({notify_sent, _Peer}, State) -> {noreply, State}. -%--------------------------------------------------------------------------- -% Rabbit Writer API methods (gen_server callbacks). -% These callbacks are invoked when a direct channel sends messages -% to this gen_server instance. -%---------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- +%% Rabbit Writer API methods (gen_server callbacks). +%% These callbacks are invoked when a direct channel sends messages +%% to this gen_server instance. +%%--------------------------------------------------------------------------- -handle_info( {send_command, Method}, State) -> handle_method(Method, State); -handle_info( {send_command, Method, Content}, State) -> handle_method(Method, Content, State); +handle_info( {send_command, Method}, State) -> + handle_method(Method, State); -%--------------------------------------------------------------------------- -% Network Writer methods (gen_server callbacks). -% These callbacks are invoked when a network channel sends messages -% to this gen_server instance. -%--------------------------------------------------------------------------- +handle_info( {send_command, Method, Content}, State) -> + handle_method(Method, Content, State); -handle_info( {method, Method, none}, State) -> handle_method(Method, State); -handle_info( {method, Method, Content}, State) -> handle_method(Method, Content, State); +%%--------------------------------------------------------------------------- +%% Network Writer methods (gen_server callbacks). +%% These callbacks are invoked when a network channel sends messages +%% to this gen_server instance. +%%--------------------------------------------------------------------------- + +handle_info( {method, Method, none}, State) -> + handle_method(Method, State); + +handle_info( {method, Method, Content}, State) -> + handle_method(Method, Content, State); %% Handles the delivery of a message from a direct channel @@ -375,28 +392,24 @@ handle_info({'EXIT', _Pid, Reason}, NewState = channel_cleanup(State), {stop, normal, NewState}; -%--------------------------------------------------------------------------- -% This is for a race condition between a close.close_ok and a subsequent channel.open -%--------------------------------------------------------------------------- - +%% This is for a race condition between a close.close_ok and a subsequent +%% channel.open handle_info( {channel_close, Peer}, State ) -> NewState = channel_cleanup(State), %% TODO Do we still need this?? Peer ! handshake, {noreply, NewState}; -%--------------------------------------------------------------------------- -% This is for a channel exception that can't be otherwise handled -%--------------------------------------------------------------------------- - +%% This is for a channel exception that can't be otherwise handled handle_info( {channel_exception, Channel, Reason}, State) -> io:format("Channel ~p is shutting down due to: ~p~n",[Channel, Reason]), NewState = channel_cleanup(State), {stop, shutdown, NewState}. -%--------------------------------------------------------------------------- -% Rest of the gen_server callbacks -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- +%% Rest of the gen_server callbacks +%%--------------------------------------------------------------------------- + terminate(normal, _State) -> ok; diff --git a/deps/amqp_client/src/amqp_connection.erl b/deps/amqp_client/src/amqp_connection.erl index 364005fdba..34efe6b356 100644 --- a/deps/amqp_client/src/amqp_connection.erl +++ b/deps/amqp_client/src/amqp_connection.erl @@ -30,19 +30,20 @@ -behaviour(gen_server). --export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, + handle_info/2]). -export([open_channel/1, open_channel/3]). -export([start/2, start/3, start/4, close/2]). -export([start_link/2, start_link/3, start_link/4]). -%--------------------------------------------------------------------------- -% AMQP Connection API Methods -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- +%% AMQP Connection API Methods +%%--------------------------------------------------------------------------- %% Starts a direct connection to the Rabbit AMQP server, assuming that %% the server is running in the same process space. -start(User,Password) -> start(User,Password,false). -start(User,Password,ProcLink) when is_boolean(ProcLink) -> +start(User, Password) -> start(User, Password, false). +start(User, Password, ProcLink) when is_boolean(ProcLink) -> InitialState = #connection_state{username = User, password = Password, vhostpath = <<"/">>}, @@ -50,9 +51,13 @@ start(User,Password,ProcLink) when is_boolean(ProcLink) -> Pid; %% Starts a networked conection to a remote AMQP server. -start(User,Password,Host) -> start(User,Password,Host,<<"/">>,false). -start(User,Password,Host,VHost) -> start(User,Password,Host,VHost,false). -start(User,Password,Host,VHost,ProcLink) -> +start(User, Password, Host) -> + start(User, Password, Host, <<"/">>, false). + +start(User, Password, Host, VHost) -> + start(User, Password, Host, VHost, false). + +start(User, Password, Host, VHost, ProcLink) -> InitialState = #connection_state{username = User, password = Password, serverhost = Host, @@ -60,37 +65,43 @@ start(User,Password,Host,VHost,ProcLink) -> {ok, Pid} = start_internal(InitialState, amqp_network_driver, ProcLink), Pid. -start_link(User,Password) -> start(User,Password,true). -start_link(User,Password,Host) -> start(User,Password,Host,<<"/">>,true). -start_link(User,Password,Host,VHost) -> start(User,Password,Host,VHost,true). +start_link(User, Password) -> + start(User, Password, true). + +start_link(User, Password, Host) -> + start(User, Password, Host, <<"/">>, true). + +start_link(User, Password, Host, VHost) -> + start(User, Password, Host, VHost, true). start_internal(InitialState, Driver, _Link = true) when is_atom(Driver) -> gen_server:start_link(?MODULE, [InitialState, Driver], []); - + start_internal(InitialState, Driver, _Link = false) when is_atom(Driver) -> gen_server:start(?MODULE, [InitialState, Driver], []). %% Opens a channel without having to specify a channel number. %% This function assumes that an AMQP connection (networked or direct) %% has already been successfully established. -open_channel(ConnectionPid) -> open_channel(ConnectionPid, none, ""). +open_channel(ConnectionPid) -> + open_channel(ConnectionPid, none, ""). %% Opens a channel with a specific channel number. %% This function assumes that an AMQP connection (networked or direct) %% has already been successfully established. open_channel(ConnectionPid, ChannelNumber, OutOfBand) -> gen_server:call(ConnectionPid, - {open_channel, ChannelNumber, + {open_channel, ChannelNumber, amqp_util:binary(OutOfBand)}). %% Closes the AMQP connection close(ConnectionPid, Close) -> gen_server:call(ConnectionPid, Close). -%--------------------------------------------------------------------------- -% Internal plumbing -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- +%% Internal plumbing +%%--------------------------------------------------------------------------- -%% Starts a new channel process, invokes the correct driver +%% Starts a new channel process, invokes the correct driver %% (network or direct) to perform any environment specific channel setup and %% starts the AMQP ChannelOpen handshake. handle_open_channel({ChannelNumber, OutOfBand}, @@ -112,8 +123,8 @@ start_channel(ChannelNumber, close_fun = fun(X) -> Driver:close_channel(X) end, do2 = fun(X, Y) -> Driver:do(X, Y) end, do3 = fun(X, Y, Z) -> Driver:do(X, Y, Z) end, - reader_pid = ReaderPid, - writer_pid = WriterPid}, + reader_pid = ReaderPid, + writer_pid = WriterPid}, process_flag(trap_exit, true), {ok, ChannelPid} = gen_server:start_link(amqp_channel, [ChannelState], []), @@ -139,7 +150,7 @@ register_channel(ChannelNumber, ChannelPid, end, State#connection_state{channels = Channels1}. -%% This will be called when a channel process exits and needs to be +%% This will be called when a channel process exits and needs to be %% deregistered %% This peforms the reverse mapping so that you can lookup a channel pid %% Let's hope that this lookup doesn't get too expensive ....... @@ -157,8 +168,8 @@ unregister_channel(ChannelPid, dict:erase(ChannelNumber, Channels0) end, State#connection_state{channels = Channels1}; - -%% This will be called when a channel process exits and needs to be + +%% This will be called when a channel process exits and needs to be %% deregistered unregister_channel(ChannelNumber, State = #connection_state{channels = Channels0}) -> @@ -175,9 +186,9 @@ allocate_channel_number(Channels, _Max) -> close_connection(Close, From, State = #connection_state{driver = Driver}) -> Driver:close_connection(Close, From, State). -%--------------------------------------------------------------------------- -% gen_server callbacks -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- +%% gen_server callbacks +%%--------------------------------------------------------------------------- init([InitialState, Driver]) when is_atom(Driver) -> State = Driver:handshake(InitialState), @@ -190,14 +201,14 @@ handle_call({open_channel, ChannelNumber, OutOfBand}, _From, State) -> %% Shuts the AMQP connection down handle_call(Close = #'connection.close'{}, From, State) -> close_connection(Close, From, State), - {stop,normal,State}. + {stop, normal, State}. handle_cast(_Message, State) -> {noreply, State}. -%--------------------------------------------------------------------------- -% Handle forced close from the broker -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- +%% Handle forced close from the broker +%%--------------------------------------------------------------------------- handle_info({method, #'connection.close'{reply_code = Code, reply_text = Text}, _Content}, @@ -206,37 +217,39 @@ handle_info({method, #'connection.close'{reply_code = Code, Driver:handle_broker_close(State), {stop, normal, State}; -%--------------------------------------------------------------------------- -% Trap exits -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- +%% Trap exits +%%--------------------------------------------------------------------------- -handle_info( {'EXIT', Pid, {amqp,Reason,Msg,Context}}, State) -> - io:format("Channel Peer ~p sent this message: ~p -> ~p~n",[Pid,Msg,Context]), +handle_info( {'EXIT', Pid, {amqp, Reason, Msg, Context}}, State) -> + io:format("Channel Peer ~p sent this message: ~p -> ~p~n", + [Pid, Msg, Context]), {HardError, Code, Text} = rabbit_framing:lookup_amqp_exception(Reason), case HardError of false -> - io:format("Just trapping this exit and proceding to trap an exit from the client channel process~n"), + io:format("Just trapping this exit and proceding to trap an + exit from the client channel process~n"), {noreply, State}; true -> io:format("Hard error: (Code = ~p, Text = ~p)~n", [Code, Text]), {stop, {hard_error, {Code, Text}}, State} - end; + end; %% Just the amqp channel shutting down, so unregister this channel handle_info( {'EXIT', Pid, normal}, State) -> {noreply, unregister_channel(Pid, State) }; - -% This is a special case for abruptly closed socket connections + +%% This is a special case for abruptly closed socket connections handle_info( {'EXIT', _Pid, {socket_error, Reason}}, State) -> {stop, {socket_error, Reason}, State}; handle_info( {'EXIT', Pid, Reason}, State) -> - io:format("Connection: Handling exit from ~p --> ~p~n",[Pid,Reason]), + io:format("Connection: Handling exit from ~p --> ~p~n", [Pid, Reason]), {noreply, unregister_channel(Pid, State) }. -%--------------------------------------------------------------------------- -% Rest of the gen_server callbacks -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- +%% Rest of the gen_server callbacks +%%--------------------------------------------------------------------------- terminate(_Reason, _State) -> ok. diff --git a/deps/amqp_client/src/amqp_direct_driver.erl b/deps/amqp_client/src/amqp_direct_driver.erl index 9e454bbb9e..f1cd61ca87 100644 --- a/deps/amqp_client/src/amqp_direct_driver.erl +++ b/deps/amqp_client/src/amqp_direct_driver.erl @@ -30,7 +30,7 @@ -include("amqp_client.hrl"). -export([handshake/1, open_channel/3, close_channel/1, close_connection/3]). --export([do/2,do/3]). +-export([do/2, do/3]). -export([handle_broker_close/1]). %--------------------------------------------------------------------------- @@ -43,23 +43,31 @@ handshake(ConnectionState = #connection_state{username = User, UserBin = amqp_util:binary(User), PassBin = amqp_util:binary(Pass), rabbit_access_control:user_pass_login(UserBin, PassBin), - rabbit_access_control:check_vhost_access(#user{username = UserBin}, VHostPath), + rabbit_access_control:check_vhost_access(#user{username = UserBin}, + VHostPath), ConnectionState. open_channel({_Channel, _OutOfBand}, ChannelPid, - State = #connection_state{username = User, vhostpath = VHost}) -> + State = #connection_state{username = User, + vhostpath = VHost}) -> UserBin = amqp_util:binary(User), ReaderPid = WriterPid = ChannelPid, Peer = rabbit_channel:start_link(ReaderPid, WriterPid, UserBin, VHost), amqp_channel:register_direct_peer(ChannelPid, Peer), State. -close_channel(_WriterPid) -> ok. +close_channel(_WriterPid) -> + ok. close_connection(_Close, From, _State) -> gen_server:reply(From, #'connection.close_ok'{}). -do(Writer, Method) -> rabbit_channel:do(Writer, Method). -do(Writer, Method, Content) -> rabbit_channel:do(Writer, Method, Content). +do(Writer, Method) -> + rabbit_channel:do(Writer, Method). + +do(Writer, Method, Content) -> + rabbit_channel:do(Writer, Method, Content). + +handle_broker_close(_State) -> + ok. -handle_broker_close(_State) -> ok. diff --git a/deps/amqp_client/src/amqp_network_driver.erl b/deps/amqp_client/src/amqp_network_driver.erl index e74975edaf..e23004a122 100644 --- a/deps/amqp_client/src/amqp_network_driver.erl +++ b/deps/amqp_client/src/amqp_network_driver.erl @@ -31,7 +31,7 @@ -export([handshake/1, open_channel/3, close_channel/1, close_connection/3]). -export([start_reader/2, start_writer/2]). --export([do/2,do/3]). +-export([do/2, do/3]). -export([handle_broker_close/1]). -define(SOCKET_CLOSING_TIMEOUT, 1000). @@ -40,23 +40,26 @@ % Driver API Methods %--------------------------------------------------------------------------- -handshake(ConnectionState = #connection_state{serverhost = Host}) -> - case gen_tcp:connect(Host, 5672, [binary, {packet, 0},{active,false}]) of +handshake(State = #connection_state{serverhost = Host}) -> + case gen_tcp:connect(Host, 5672, + [binary, {packet, 0}, {active, false}]) of {ok, Sock} -> ok = gen_tcp:send(Sock, amqp_util:protocol_header()), Parent = self(), - FramingPid = rabbit_framing_channel:start_link(fun(X) -> X end, [Parent]), - ReaderPid = spawn_link(?MODULE, start_reader, [Sock, FramingPid]), + FramingPid = rabbit_framing_channel:start_link(fun(X) -> X end, + [Parent]), + ReaderPid = spawn_link(?MODULE, start_reader, + [Sock, FramingPid]), WriterPid = start_writer(Sock, 0), - ConnectionState1 = ConnectionState#connection_state{channel0_writer_pid = WriterPid, - reader_pid = ReaderPid, - sock = Sock}, - ConnectionState2 = network_handshake(WriterPid, ConnectionState1), - #connection_state{heartbeat = Heartbeat} = ConnectionState2, + State1 = State#connection_state{channel0_writer_pid = WriterPid, + reader_pid = ReaderPid, + sock = Sock}, + State2 = network_handshake(WriterPid, State1), + #connection_state{heartbeat = Heartbeat} = State2, ReaderPid ! {heartbeat, Heartbeat}, - ConnectionState2; + State2; {error, Reason} -> - io:format("Could not start the network driver: ~p~n",[Reason]), + io:format("Could not start the network driver: ~p~n", [Reason]), exit(Reason) end. @@ -72,7 +75,7 @@ open_channel({ChannelNumber, _OutOfBand}, ChannelPid, amqp_channel:register_direct_peer(ChannelPid, WriterPid ). close_channel(WriterPid) -> - %io:format("Shutting the channel writer ~p down~n",[WriterPid]), + %io:format("Shutting the channel writer ~p down~n", [WriterPid]), rabbit_writer:shutdown(WriterPid). %% This closes the writer down, waits for the confirmation from the @@ -89,8 +92,11 @@ close_connection(Close = #'connection.close'{}, From, exit(timeout_on_exit) end. -do(Writer, Method) -> rabbit_writer:send_command(Writer, Method). -do(Writer, Method, Content) -> rabbit_writer:send_command(Writer, Method, Content). +do(Writer, Method) -> + rabbit_writer:send_command(Writer, Method). + +do(Writer, Method, Content) -> + rabbit_writer:send_command(Writer, Method, Content). handle_broker_close(#connection_state{channel0_writer_pid = Writer, reader_pid = Reader}) -> @@ -117,7 +123,8 @@ recv() -> % Internal plumbing %--------------------------------------------------------------------------- -network_handshake(Writer, State = #connection_state{ vhostpath = VHostPath }) -> +network_handshake(Writer, + State = #connection_state{ vhostpath = VHostPath }) -> #'connection.start'{} = recv(), do(Writer, start_ok(State)), #'connection.tune'{channel_max = ChannelMax, @@ -129,7 +136,8 @@ network_handshake(Writer, State = #connection_state{ vhostpath = VHostPath }) -> do(Writer, TuneOk), %% This is something where I don't understand the protocol, - %% What happens if the following command reaches the server before the tune ok? + %% What happens if the following command reaches the server + %% before the tune ok? %% Or doesn't get sent at all? ConnectionOpen = #'connection.open'{virtual_host = VHostPath, capabilities = <<"">>, @@ -146,7 +154,7 @@ start_ok(#connection_state{username = Username, password = Password}) -> client_properties = [ {<<"product">>, longstr, <<"Erlang-AMQC">>}, {<<"version">>, longstr, <<"0.1">>}, - {<<"platform">>,longstr, <<"Erlang">>} + {<<"platform">>, longstr, <<"Erlang">>} ], mechanism = <<"AMQPLAIN">>, response = rabbit_binary_generator:generate_table(LoginTable), @@ -154,7 +162,7 @@ start_ok(#connection_state{username = Username, password = Password}) -> start_reader(Sock, FramingPid) -> process_flag(trap_exit, true), - put({channel, 0},{chpid, FramingPid}), + put({channel, 0}, {chpid, FramingPid}), {ok, _Ref} = prim_inet:async_recv(Sock, 7, -1), reader_loop(Sock, undefined, undefined, undefined), gen_tcp:close(Sock). @@ -164,7 +172,7 @@ start_writer(Sock, Channel) -> reader_loop(Sock, Type, Channel, Length) -> receive - {inet_async, Sock, _, {ok, <>} } -> + {inet_async, Sock, _, {ok, <>} } -> case handle_frame(Type, Channel, Payload) of closed_ok -> ok; @@ -172,9 +180,9 @@ reader_loop(Sock, Type, Channel, Length) -> {ok, _Ref} = prim_inet:async_recv(Sock, 7, -1), reader_loop(Sock, undefined, undefined, undefined) end; - {inet_async, Sock, _, {ok, <<_Type:8,_Channel:16,PayloadSize:32>>}} -> + {inet_async, Sock, _, {ok, <<_Type:8, _Chan:16, PayloadSize:32>>}} -> {ok, _Ref} = prim_inet:async_recv(Sock, PayloadSize + 1, -1), - reader_loop(Sock, _Type, _Channel, PayloadSize); + reader_loop(Sock, _Type, _Chan, PayloadSize); {inet_async, Sock, _Ref, {error, closed}} -> ok; {inet_async, Sock, _Ref, {error, Reason}} -> @@ -187,11 +195,13 @@ reader_loop(Sock, Type, Channel, Length) -> start_framing_channel(ChannelPid, ChannelNumber), reader_loop(Sock, Type, Channel, Length); timeout -> - io:format("Reader (~p) received timeout from heartbeat, exiting ~n",[self()]); + io:format("Reader (~p) received timeout from heartbeat, + exiting ~n", [self()]); close -> - io:format("Reader (~p) received close command, exiting ~n",[self()]); + io:format("Reader (~p) received close command, + exiting ~n", [self()]); {'EXIT', Pid, _Reason} -> - [H|_] = get_keys({chpid,Pid}), + [H|_] = get_keys({chpid, Pid}), erase(H), reader_loop(Sock, Type, Channel, Length); Other -> @@ -200,8 +210,9 @@ reader_loop(Sock, Type, Channel, Length) -> end. start_framing_channel(ChannelPid, ChannelNumber) -> - FramingPid = rabbit_framing_channel:start_link(fun(X) -> link(X), X end, [ChannelPid]), - put({channel, ChannelNumber},{chpid, FramingPid}). + FramingPid = rabbit_framing_channel:start_link(fun(X) -> link(X), X end, + [ChannelPid]), + put({channel, ChannelNumber}, {chpid, FramingPid}). handle_frame(Type, Channel, Payload) -> case rabbit_reader:analyze_frame(Type, Payload) of @@ -214,7 +225,7 @@ handle_frame(Type, Channel, Payload) -> heartbeat; trace -> trace; - {method,'connection.close_ok',Content} -> + {method, 'connection.close_ok', Content} -> send_frame(Channel, {method, 'connection.close_ok', Content}), closed_ok; AnalyzedFrame -> @@ -228,3 +239,4 @@ resolve_receiver(Channel) -> undefined -> exit(unknown_channel) end. + diff --git a/deps/amqp_client/src/amqp_rpc_client.erl b/deps/amqp_client/src/amqp_rpc_client.erl index c250553c85..48f3e62a44 100644 --- a/deps/amqp_client/src/amqp_rpc_client.erl +++ b/deps/amqp_client/src/amqp_rpc_client.erl @@ -79,7 +79,7 @@ publish(Payload, From, reply_to = Q}, lib_amqp:publish(Channel, X, RoutingKey, Payload, Props), State#rpc_client_state{correlation_id = CorrelationId + 1, - continuations + continuations = dict:store(CorrelationId, From, Continuations)}. %--------------------------------------------------------------------------- diff --git a/deps/amqp_client/src/amqp_util.erl b/deps/amqp_client/src/amqp_util.erl index 08c9284204..80e638c0ad 100644 --- a/deps/amqp_client/src/amqp_util.erl +++ b/deps/amqp_client/src/amqp_util.erl @@ -33,7 +33,9 @@ -export([basic_properties/0, protocol_header/0]). basic_properties() -> - #'P_basic'{content_type = <<"application/octet-stream">>, delivery_mode = 1, priority = 0}. + #'P_basic'{content_type = <<"application/octet-stream">>, + delivery_mode = 1, + priority = 0}. protocol_header() -> <<"AMQP", 1, 1, ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR>>. diff --git a/deps/amqp_client/src/direct_client_test.erl b/deps/amqp_client/src/direct_client_test.erl index 1a350efbeb..44883d9307 100644 --- a/deps/amqp_client/src/direct_client_test.erl +++ b/deps/amqp_client/src/direct_client_test.erl @@ -33,30 +33,39 @@ -include_lib("eunit/include/eunit.hrl"). -basic_get_test() -> test_util:basic_get_test(new_connection()). +basic_get_test() -> + test_util:basic_get_test(new_connection()). -basic_return_test() -> test_util:basic_return_test(new_connection()). +basic_return_test() -> + test_util:basic_return_test(new_connection()). -basic_qos_test() -> test_util:basic_qos_test(new_connection()). +basic_qos_test() -> + test_util:basic_qos_test(new_connection()). -basic_recover_test() -> test_util:basic_recover_test(new_connection()). +basic_recover_test() -> + test_util:basic_recover_test(new_connection()). -basic_consume_test() -> test_util:basic_consume_test(new_connection()). +basic_consume_test() -> + test_util:basic_consume_test(new_connection()). -lifecycle_test() -> test_util:lifecycle_test(new_connection()). +lifecycle_test() -> + test_util:lifecycle_test(new_connection()). -basic_ack_test() ->test_util:basic_ack_test(new_connection()). +basic_ack_test() -> + test_util:basic_ack_test(new_connection()). -command_serialization_test() -> test_util:command_serialization_test(new_connection()). +command_serialization_test() -> + test_util:command_serialization_test(new_connection()). -%---------------------------------------------------------------------------- -% This must be kicked off manually because it can only be run after Rabbit -% has been running for 1 minute +%%--------------------------------------------------------------------------- +%% This must be kicked off manually because it can only be run after Rabbit +%% has been running for 1 minute test_channel_flow() -> test_util:channel_flow_test(new_connection()). -%---------------------------------------------------------------------------- -% Negative Tests +%%--------------------------------------------------------------------------- +%% Negative Tests +%%--------------------------------------------------------------------------- non_existent_exchange_test() -> negative_test_util:non_existent_exchange_test(new_connection()). @@ -64,10 +73,12 @@ non_existent_exchange_test() -> queue_unbind_test() -> test_util:queue_unbind_test(new_connection()). -%---------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- %% Common Functions +%%--------------------------------------------------------------------------- -new_connection() -> amqp_connection:start("guest", "guest"). +new_connection() -> + amqp_connection:start("guest", "guest"). test_coverage() -> rabbit_misc:enable_cover(), diff --git a/deps/amqp_client/src/lib_amqp.erl b/deps/amqp_client/src/lib_amqp.erl index f2fad51237..597becc0df 100644 --- a/deps/amqp_client/src/lib_amqp.erl +++ b/deps/amqp_client/src/lib_amqp.erl @@ -1,3 +1,28 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is the RabbitMQ Erlang Client. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial +%% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C) +%% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): Ben Hood <0x6e6562@gmail.com>. +%% + -module(lib_amqp). -include_lib("rabbitmq_server/include/rabbit.hrl"). @@ -21,9 +46,12 @@ declare_exchange(Channel, X) -> declare_exchange(Channel, X, Type) -> ExchangeDeclare = #'exchange.declare'{exchange = X, type = Type, - passive = false, durable = false, - auto_delete = false, internal = false, - nowait = false, arguments = []}, + passive = false, + durable = false, + auto_delete = false, + internal = false, + nowait = false, + arguments = []}, amqp_channel:call(Channel, ExchangeDeclare). delete_exchange(Channel, X) -> @@ -31,9 +59,9 @@ delete_exchange(Channel, X) -> if_unused = false, nowait = false}, #'exchange.delete_ok'{} = amqp_channel:call(Channel, ExchangeDelete). -%--------------------------------------------------------------------------- -% TODO This whole section of optional properties and mandatory flags -% may have to be re-thought +%%--------------------------------------------------------------------------- +%% TODO This whole section of optional properties and mandatory flags +%% may have to be re-thought publish(Channel, X, RoutingKey, Payload) -> publish(Channel, X, RoutingKey, Payload, false). @@ -41,10 +69,10 @@ publish(Channel, X, RoutingKey, Payload, Mandatory) when is_boolean(Mandatory)-> publish(Channel, X, RoutingKey, Payload, Mandatory, amqp_util:basic_properties()); - + publish(Channel, X, RoutingKey, Payload, Properties) -> publish(Channel, X, RoutingKey, Payload, false, Properties). - + publish(Channel, X, RoutingKey, Payload, Mandatory, Properties) -> publish_internal(fun amqp_channel:call/3, Channel, X, RoutingKey, Payload, Mandatory, Properties). @@ -69,18 +97,23 @@ publish_internal(Fun, Channel, X, RoutingKey, payload_fragments_rev = [Payload]}, Fun(Channel, BasicPublish, Content). -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- close_channel(Channel) -> - ChannelClose = #'channel.close'{reply_code = 200, reply_text = <<"Goodbye">>, - class_id = 0, method_id = 0}, + ChannelClose = #'channel.close'{reply_code = 200, + reply_text = <<"Goodbye">>, + class_id = 0, + method_id = 0}, #'channel.close_ok'{} = amqp_channel:call(Channel, ChannelClose), ok. close_connection(Connection) -> - ConnectionClose = #'connection.close'{reply_code = 200, reply_text = <<"Goodbye">>, - class_id = 0, method_id = 0}, - #'connection.close_ok'{} = amqp_connection:close(Connection, ConnectionClose), + ConnectionClose = #'connection.close'{reply_code = 200, + reply_text = <<"Goodbye">>, + class_id = 0, + method_id = 0}, + #'connection.close_ok'{} = amqp_connection:close(Connection, + ConnectionClose), ok. teardown(Connection, Channel) -> @@ -121,13 +154,13 @@ subscribe(Channel, Q, Consumer, Tag, NoAck) -> consumer_tag = Tag, no_local = false, no_ack = NoAck, exclusive = false, nowait = false}, - #'basic.consume_ok'{consumer_tag = ConsumerTag} = - amqp_channel:subscribe(Channel,BasicConsume, Consumer), + #'basic.consume_ok'{consumer_tag = ConsumerTag} = + amqp_channel:subscribe(Channel, BasicConsume, Consumer), ConsumerTag. unsubscribe(Channel, Tag) -> BasicCancel = #'basic.cancel'{consumer_tag = Tag, nowait = false}, - #'basic.cancel_ok'{} = amqp_channel:call(Channel,BasicCancel), + #'basic.cancel_ok'{} = amqp_channel:call(Channel, BasicCancel), ok. %%--------------------------------------------------------------------------- @@ -174,10 +207,12 @@ delete_queue(Channel, Q) -> bind_queue(Channel, X, Q, Binding) -> QueueBind = #'queue.bind'{queue = Q, exchange = X, - routing_key = Binding, nowait = false, arguments = []}, + routing_key = Binding, + nowait = false, arguments = []}, #'queue.bind_ok'{} = amqp_channel:call(Channel, QueueBind). unbind_queue(Channel, X, Q, Binding) -> Unbind = #'queue.unbind'{queue = Q, exchange = X, routing_key = Binding, arguments = []}, #'queue.unbind_ok'{} = amqp_channel:call(Channel, Unbind). + diff --git a/deps/amqp_client/src/negative_test_util.erl b/deps/amqp_client/src/negative_test_util.erl index 1e4f3395d7..0174271f06 100644 --- a/deps/amqp_client/src/negative_test_util.erl +++ b/deps/amqp_client/src/negative_test_util.erl @@ -35,7 +35,7 @@ non_existent_exchange_test(Connection) -> Payload = <<"foobar">>, Channel = lib_amqp:start_channel(Connection), lib_amqp:declare_exchange(Channel, X), - % Deliberately mix up the routingkey and exchange arguments + %% Deliberately mix up the routingkey and exchange arguments lib_amqp:publish(Channel, RoutingKey, X, Payload), receive X -> ok @@ -43,4 +43,4 @@ non_existent_exchange_test(Connection) -> end, ?assertNot(is_process_alive(Channel)), ?assert(is_process_alive(Connection)), - lib_amqp:close_connection(Connection). \ No newline at end of file + lib_amqp:close_connection(Connection). diff --git a/deps/amqp_client/src/network_client_test.erl b/deps/amqp_client/src/network_client_test.erl index e7875da8dc..165fa441f4 100644 --- a/deps/amqp_client/src/network_client_test.erl +++ b/deps/amqp_client/src/network_client_test.erl @@ -30,34 +30,34 @@ -include_lib("eunit/include/eunit.hrl"). basic_get_test() -> - test_util:basic_get_test(new_connection()). + test_util:basic_get_test(new_connection()). basic_return_test() -> - test_util:basic_return_test(new_connection()). + test_util:basic_return_test(new_connection()). basic_qos_test() -> - test_util:basic_qos_test(new_connection()). + test_util:basic_qos_test(new_connection()). basic_recover_test() -> - test_util:basic_recover_test(new_connection()). + test_util:basic_recover_test(new_connection()). basic_consume_test() -> - test_util:basic_consume_test(new_connection()). + test_util:basic_consume_test(new_connection()). lifecycle_test() -> - test_util:lifecycle_test(new_connection()). + test_util:lifecycle_test(new_connection()). basic_ack_test() -> - test_util:basic_ack_test(new_connection()). + test_util:basic_ack_test(new_connection()). channel_lifecycle_test() -> - test_util:channel_lifecycle_test(new_connection()). - + test_util:channel_lifecycle_test(new_connection()). + queue_unbind_test() -> test_util:queue_unbind_test(new_connection()). command_serialization_test() -> - test_util:command_serialization_test(new_connection()). + test_util:command_serialization_test(new_connection()). teardown_test() -> test_util:teardown_test(new_connection()). @@ -65,14 +65,14 @@ teardown_test() -> rpc_test() -> test_util:rpc_test(new_connection()). -%---------------------------------------------------------------------------- -% Negative Tests +%%--------------------------------------------------------------------------- +%% Negative Tests non_existent_exchange_test() -> negative_test_util:non_existent_exchange_test(new_connection()). -%---------------------------------------------------------------------------- -%% Common Functions +%%--------------------------------------------------------------------------- +%% Common Functions new_connection() -> amqp_connection:start("guest", "guest", "localhost"). diff --git a/deps/amqp_client/src/test_util.erl b/deps/amqp_client/src/test_util.erl index e51d4f9c75..ecb3133795 100644 --- a/deps/amqp_client/src/test_util.erl +++ b/deps/amqp_client/src/test_util.erl @@ -32,37 +32,40 @@ -compile([export_all]). --record(publish,{q, x, routing_key, bind_key, payload, +-record(publish, {q, x, routing_key, bind_key, payload, mandatory = false, immediate = false}). -% The latch constant defines how many processes are spawned in order -% to run certain functionality in parallel. It follows the standard -% countdown latch pattern. +%% The latch constant defines how many processes are spawned in order +%% to run certain functionality in parallel. It follows the standard +%% countdown latch pattern. -define(Latch, 100). -% The wait constant defines how long a consumer waits before it -% unsubscribes +%% The wait constant defines how long a consumer waits before it +%% unsubscribes -define(Wait, 200). %%%% -% -% This is an example of how the client interaction should work -% -% Connection = amqp_connection:start(User, Password, Host), -% Channel = amqp_connection:open_channel(Connection), -% %%...do something useful -% ChannelClose = #'channel.close'{ %% set the appropriate fields }, -% amqp_channel:call(Channel, ChannelClose), -% ConnectionClose = #'connection.close'{ %% set the appropriate fields }, -% amqp_connection:close(Connection, ConnectionClose). -% +%% +%% This is an example of how the client interaction should work +%% +%% Connection = amqp_connection:start(User, Password, Host), +%% Channel = amqp_connection:open_channel(Connection), +%% %%...do something useful +%% ChannelClose = #'channel.close'{ %% set the appropriate fields }, +%% amqp_channel:call(Channel, ChannelClose), +%% ConnectionClose = #'connection.close'{ %% set the appropriate fields }, +%% amqp_connection:close(Connection, ConnectionClose). +%% lifecycle_test(Connection) -> X = <<"x">>, Channel = lib_amqp:start_channel(Connection), lib_amqp:declare_exchange(Channel, X, <<"topic">>), Parent = self(), - [spawn(fun() -> queue_exchange_binding(Channel, X, Parent, Tag) end) || Tag <- lists:seq(1,?Latch)], + [spawn( + fun() -> + queue_exchange_binding(Channel, X, Parent, Tag) end) + || Tag <- lists:seq(1, ?Latch)], latch_loop(?Latch), lib_amqp:delete_exchange(Channel, X), lib_amqp:teardown(Connection, Channel), @@ -74,7 +77,7 @@ queue_exchange_binding(Channel, X, Parent, Tag) -> after (?Latch - Tag rem 7) * 10 -> ok end, - Q = <<"a.b.c",Tag:32>>, + Q = <<"a.b.c", Tag:32>>, Binding = <<"a.b.c.*">>, Q1 = lib_amqp:declare_queue(Channel, Q), ?assertMatch(Q, Q1), @@ -89,17 +92,17 @@ channel_lifecycle_test(Connection) -> lib_amqp:teardown(Connection, Channel2), ok. -% This is designed to exercize the internal queuing mechanism -% to ensure that commands are properly serialized +%% This is designed to exercize the internal queuing mechanism +%% to ensure that commands are properly serialized command_serialization_test(Connection) -> Channel = lib_amqp:start_channel(Connection), Parent = self(), - [spawn(fun() -> + [spawn(fun() -> Q = uuid(), Q1 = lib_amqp:declare_queue(Channel, Q), - ?assertMatch(Q, Q1), + ?assertMatch(Q, Q1), Parent ! finished - end) || _ <- lists:seq(1,?Latch)], + end) || _ <- lists:seq(1, ?Latch)], latch_loop(?Latch), lib_amqp:teardown(Connection, Channel). @@ -120,7 +123,7 @@ queue_unbind_test(Connection) -> get_and_assert_empty(Channel, Q) -> BasicGetEmpty = lib_amqp:get(Channel, Q, false), ?assertMatch('basic.get_empty', BasicGetEmpty). - + get_and_assert_equals(Channel, Q, Payload) -> Content = lib_amqp:get(Channel, Q), #content{payload_fragments_rev = PayloadFragments} = Content, @@ -129,8 +132,8 @@ get_and_assert_equals(Channel, Q, Payload) -> basic_get_test(Connection) -> Channel = lib_amqp:start_channel(Connection), {ok, Q} = setup_publish(Channel), - % TODO: This could be refactored to use get_and_assert_equals, - % get_and_assert_empty .... would require another bug though :-) + %% TODO: This could be refactored to use get_and_assert_equals, + %% get_and_assert_empty .... would require another bug though :-) Content = lib_amqp:get(Channel, Q), #content{payload_fragments_rev = PayloadFragments} = Content, ?assertMatch([<<"foobar">>], PayloadFragments), @@ -158,7 +161,7 @@ basic_return_test(Connection) -> ?assertMatch([Payload], Payload2); WhatsThis -> %% TODO investigate where this comes from - io:format("Spurious message ~p~n",[WhatsThis]) + io:format("Spurious message ~p~n", [WhatsThis]) after 2000 -> exit(no_return_received) end, @@ -193,7 +196,7 @@ consume_loop(Channel, X, RoutingKey, Parent, Tag) -> receive #'basic.consume_ok'{consumer_tag = Tag} -> ok end, - receive + receive {#'basic.deliver'{}, _} -> ok end, lib_amqp:unsubscribe(Channel, Tag), @@ -221,7 +224,7 @@ basic_recover_test(Connection) -> exit(did_not_receive_first_message) end, BasicRecover = #'basic.recover'{requeue = true}, - amqp_channel:cast(Channel,BasicRecover), + amqp_channel:cast(Channel, BasicRecover), receive {#'basic.deliver'{delivery_tag = DeliveryTag2}, _} -> lib_amqp:ack(Channel, DeliveryTag2) @@ -230,27 +233,28 @@ basic_recover_test(Connection) -> end, lib_amqp:teardown(Connection, Channel). -% QOS is not yet implemented in RabbitMQ +%% QOS is not yet implemented in RabbitMQ basic_qos_test(Connection) -> lib_amqp:close_connection(Connection). -% Reject is not yet implemented in RabbitMQ +%% Reject is not yet implemented in RabbitMQ basic_reject_test(Connection) -> lib_amqp:close_connection(Connection). -%---------------------------------------------------------------------------- -% Unit test for the direct client -% This just relies on the fact that a fresh Rabbit VM must consume more than -% 0.1 pc of the system memory: -% 0. Wait 1 minute to let memsup do stuff -% 1. Make sure that the high watermark is set high -% 2. Start a process to receive the pause and resume commands from the broker -% 3. Register this as flow control notification handler -% 4. Let the system settle for a little bit -% 5. Set the threshold to the lowest possible value -% 6. When the flow handler receives the pause command, it sets the watermark -% to a high value in order to get the broker to send the resume command -% 7. Allow 10 secs to receive the pause and resume, otherwise timeout and fail +%%---------------------------------------------------------------------------- +%% Unit test for the direct client +%% This just relies on the fact that a fresh Rabbit VM must consume more than +%% 0.1 pc of the system memory: +%% 0. Wait 1 minute to let memsup do stuff +%% 1. Make sure that the high watermark is set high +%% 2. Start a process to receive the pause and resume commands from the broker +%% 3. Register this as flow control notification handler +%% 4. Let the system settle for a little bit +%% 5. Set the threshold to the lowest possible value +%% 6. When the flow handler receives the pause command, it sets the watermark +%% to a high value in order to get the broker to send the resume command +%% 7. Allow 10 secs to receive the pause and resume, otherwise timeout and +%% fail channel_flow_test(Connection) -> X = <<"amq.direct">>, K = Payload = <<"x">>, @@ -280,9 +284,9 @@ channel_flow_test(Connection) -> exit(did_not_receive_channel_flow) end. -%---------------------------------------------------------------------------- -% This is a test, albeit not a unit test, to see if the producer -% handles the effect of being throttled. +%%---------------------------------------------------------------------------- +%% This is a test, albeit not a unit test, to see if the producer +%% handles the effect of being throttled. channel_flow_sync(Connection) -> start_channel_flow(Connection, fun lib_amqp:publish/4). @@ -324,21 +328,21 @@ cf_consumer_loop(Channel, Tag) -> ok end. -cf_producer_loop(Channel, X, Key, PublishFun, Payload, N) +cf_producer_loop(Channel, X, Key, PublishFun, Payload, N) when N rem 5000 =:= 0 -> io:format("Producer (~p) has sent about ~p messages since it started~n", [self(), N]), cf_producer_loop(Channel, X, Key, PublishFun, Payload, N + 1); cf_producer_loop(Channel, X, Key, PublishFun, Payload, N) -> - case PublishFun(Channel, X, Key, Payload) of + case PublishFun(Channel, X, Key, Payload) of blocked -> io:format("Producer (~p) is blocked, will go to sleep.....ZZZ~n", [self()]), receive resume -> io:format("Producer (~p) has woken up :-)~n", [self()]), - cf_producer_loop(Channel, X, Key, + cf_producer_loop(Channel, X, Key, PublishFun, Payload, N + 1) end; _ -> @@ -358,9 +362,9 @@ cf_handler_loop(Producer) -> stop -> ok end. -%--------------------------------------------------------------------------- -% This tests whether RPC over AMQP produces the same result as invoking the -% same argument against the same underlying gen_server instance. +%%--------------------------------------------------------------------------- +%% This tests whether RPC over AMQP produces the same result as invoking the +%% same argument against the same underlying gen_server instance. rpc_test(Connection) -> Q = uuid(), Fun = fun(X) -> X + 1 end, @@ -376,7 +380,7 @@ rpc_test(Connection) -> amqp_rpc_server:stop(Server), ok. -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- setup_publish(Channel) -> Publish = #publish{routing_key = <<"a.b.c.d">>, @@ -409,7 +413,9 @@ setup_exchange(Channel, Q, X, Binding) -> lib_amqp:bind_queue(Channel, X, Q, Binding), ok. -latch_loop(0) -> ok; +latch_loop(0) -> + ok; + latch_loop(Latch) -> receive finished -> @@ -420,5 +426,5 @@ latch_loop(Latch) -> uuid() -> {A, B, C} = now(), - <>. + <>.