diff --git a/deps/amqp_client/Makefile b/deps/amqp_client/Makefile index b85fda76fc..fac2b7031a 100644 --- a/deps/amqp_client/Makefile +++ b/deps/amqp_client/Makefile @@ -1,13 +1,11 @@ EBIN_DIR=ebin SOURCE_DIR=src -TEST_SOURCE_DIR=test INCLUDE_DIR=include ERLC_FLAGS=-W0 compile: mkdir -p $(EBIN_DIR) - erlc -I $(INCLUDE_DIR) -o $(EBIN_DIR) $(ERLC_FLAGS) $(SOURCE_DIR)/*.erl - erlc -I $(INCLUDE_DIR) -o $(EBIN_DIR) $(ERLC_FLAGS) $(TEST_SOURCE_DIR)/*.erl + erlc +debug_info -I $(INCLUDE_DIR) -o $(EBIN_DIR) $(ERLC_FLAGS) $(SOURCE_DIR)/*.erl all: compile diff --git a/deps/amqp_client/include/amqp_client.hrl b/deps/amqp_client/include/amqp_client.hrl index c82ef262fe..0dea1e27c4 100644 --- a/deps/amqp_client/include/amqp_client.hrl +++ b/deps/amqp_client/include/amqp_client.hrl @@ -18,11 +18,4 @@ closing = false, consumers = dict:new()} ). --record(rpc_client_state, {channel_pid, - ticket, - exchange, - routing_key, - queue, - consumer_tag, - continuations = dict:new(), - correlation_id = 0}). +-record(rpc_client, {channel_pid, ticket, exchange, routing_key, queue, consumer_tag}). diff --git a/deps/amqp_client/test/amqp_consumer.erl b/deps/amqp_client/src/amqp_consumer.erl similarity index 100% rename from deps/amqp_client/test/amqp_consumer.erl rename to deps/amqp_client/src/amqp_consumer.erl diff --git a/deps/amqp_client/test/amqp_direct_client_test.erl b/deps/amqp_client/src/amqp_direct_client_test.erl similarity index 82% rename from deps/amqp_client/test/amqp_direct_client_test.erl rename to deps/amqp_client/src/amqp_direct_client_test.erl index a598b6a56d..200b5e3bd0 100644 --- a/deps/amqp_client/test/amqp_direct_client_test.erl +++ b/deps/amqp_client/src/amqp_direct_client_test.erl @@ -1,5 +1,7 @@ -module(amqp_direct_client_test). +-export([test_coverage/0]). + -include_lib("eunit/include/eunit.hrl"). basic_get_test() -> @@ -17,3 +19,8 @@ lifecycle_test() -> basic_ack_test() -> Connection = amqp_connection:start("guest", "guest"), amqp_test_util:basic_ack_test(Connection). + +test_coverage() -> + rabbit_misc:enable_cover(), + test(), + rabbit_misc:report_cover(). diff --git a/deps/amqp_client/src/amqp_rpc_util.erl b/deps/amqp_client/src/amqp_method_util.erl similarity index 71% rename from deps/amqp_client/src/amqp_rpc_util.erl rename to deps/amqp_client/src/amqp_method_util.erl index afd14b13df..8cb8ee25eb 100644 --- a/deps/amqp_client/src/amqp_rpc_util.erl +++ b/deps/amqp_client/src/amqp_method_util.erl @@ -1,15 +1,15 @@ --module(amqp_rpc_util). +-module(amqp_method_util). --include_lib("rabbitmq_server/include/rabbit_framing.hrl"). -include("amqp_client.hrl"). +-include_lib("rabbitmq_server/include/rabbit_framing.hrl"). -export([register_consumer/2]). % Registers a consumer in this channel -register_consumer(RpcClientState = #rpc_client_state{channel_pid = ChannelPid, ticket = Ticket, queue = Q}, Consumer) -> +register_consumer(RpcClientState = #rpc_client{channel_pid = ChannelPid, ticket = Ticket, queue = Q}, Consumer) -> Tag = <<"">>, BasicConsume = #'basic.consume'{ticket = Ticket, queue = Q, consumer_tag = Tag, no_local = false, no_ack = true, exclusive = false, nowait = false}, #'basic.consume_ok'{consumer_tag = ConsumerTag} = amqp_channel:call(ChannelPid, BasicConsume, Consumer), - RpcClientState#rpc_client_state{consumer_tag = ConsumerTag}. + RpcClientState#rpc_client{consumer_tag = ConsumerTag}. diff --git a/deps/amqp_client/test/amqp_network_client_test.erl b/deps/amqp_client/src/amqp_network_client_test.erl similarity index 82% rename from deps/amqp_client/test/amqp_network_client_test.erl rename to deps/amqp_client/src/amqp_network_client_test.erl index eb475ef075..cdc993cbdb 100644 --- a/deps/amqp_client/test/amqp_network_client_test.erl +++ b/deps/amqp_client/src/amqp_network_client_test.erl @@ -1,5 +1,7 @@ -module(amqp_network_client_test). +-export([test_coverage/0]). + -include_lib("eunit/include/eunit.hrl"). basic_get_test() -> @@ -18,6 +20,7 @@ basic_ack_test() -> Connection = amqp_connection:start("guest", "guest", "localhost"), amqp_test_util:basic_ack_test(Connection). -rpc_client_test() -> - Connection = amqp_connection:start("guest", "guest", "localhost"), - amqp_test_util:rpc_client_test(Connection). +test_coverage() -> + rabbit_misc:enable_cover(), + test(), + rabbit_misc:report_cover(). diff --git a/deps/amqp_client/src/amqp_network_driver.erl b/deps/amqp_client/src/amqp_network_driver.erl index a49b64856a..e4e119771f 100644 --- a/deps/amqp_client/src/amqp_network_driver.erl +++ b/deps/amqp_client/src/amqp_network_driver.erl @@ -153,8 +153,6 @@ writer_loop(Sock) -> {Sender, Method, Content} when is_pid(Sender) -> Channel = resolve_channel(Sender), FrameMax = 131072, %% set to zero once QPid fix their negotiation - io:format("1. Trying to send ~p~n",[Method]), - io:format("2. Trying to send ~p~n",[Content]), rabbit_writer:internal_send_command(Sock, Channel, Method, Content, FrameMax), writer_loop(Sock); Other -> diff --git a/deps/amqp_client/src/amqp_rpc_client.erl b/deps/amqp_client/src/amqp_rpc_client.erl index 6d250bd67e..148c86fb25 100644 --- a/deps/amqp_client/src/amqp_rpc_client.erl +++ b/deps/amqp_client/src/amqp_rpc_client.erl @@ -1,13 +1,11 @@ -module(amqp_rpc_client). -include_lib("rabbitmq_server/include/rabbit_framing.hrl"). --include_lib("rabbitmq_server/include/rabbit.hrl"). -include("amqp_client.hrl"). -behaviour(gen_server). -export([start/1]). --export([call/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). %--------------------------------------------------------------------------- @@ -15,19 +13,15 @@ %--------------------------------------------------------------------------- start(BrokerConfig) -> - {ok, RpcClientPid} = gen_server:start(?MODULE, [BrokerConfig], []), - RpcClientPid. - -call(RpcClientPid, Payload) -> - gen_server:call(RpcClientPid, Payload). + gen_server:start(?MODULE, [BrokerConfig], []). %--------------------------------------------------------------------------- % Plumbing %--------------------------------------------------------------------------- % Sets up a reply queue for this client to listen on -setup_reply_queue(State = #rpc_client_state{channel_pid = ChannelPid, ticket = Ticket}) -> - QueueDeclare = #'queue.declare'{ticket = Ticket, queue = <<>>, +setup_reply_queue(State = #rpc_client{channel_pid = ChannelPid, ticket = Ticket}) -> + QueueDeclare = #'queue.declare'{ticket = Ticket, queue = [], passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = []}, @@ -35,62 +29,36 @@ setup_reply_queue(State = #rpc_client_state{channel_pid = ChannelPid, ticket = T message_count = MessageCount, consumer_count = ConsumerCount} = amqp_channel:call(ChannelPid, QueueDeclare), - State#rpc_client_state{queue = Q}. + State#rpc_client{queue = Q}. % Sets up a consumer to handle rpc responses setup_consumer(State) -> - ConsumerTag = amqp_rpc_util:register_consumer(State, self()), - State#rpc_client_state{consumer_tag = ConsumerTag}. - -% Publishes to the broker, stores the From address against -% the correlation id and increments the correlationid for -% the next request -publish(Payload, From, - State = #rpc_client_state{channel_pid = ChannelPid, ticket = Ticket, - exchange = X, routing_key = RoutingKey, - queue = Q, correlation_id = CorrelationId, - continuations = Continuations}) -> - BasicPublish = #'basic.publish'{ticket = Ticket, exchange = X, - routing_key = RoutingKey, - mandatory = false, immediate = false}, - Props = #'P_basic'{correlation_id = list_to_binary(integer_to_list(CorrelationId)), reply_to = Q}, - Content = #content{class_id = 60, %% TODO HARDCODED VALUE - properties = Props, properties_bin = 'none', - payload_fragments_rev = [Payload]}, - io:format("RPC publish 1 q -> ~p~n",[CorrelationId]), - amqp_channel:cast(ChannelPid, BasicPublish, Content), - NewContinuations = dict:store(CorrelationId, From , Continuations), - State#rpc_client_state{correlation_id = CorrelationId + 1, continuations = NewContinuations}. + ConsumerTag = amqp_method_util:register_consumer(State, self()), + State#rpc_client{consumer_tag = ConsumerTag}. %--------------------------------------------------------------------------- % gen_server callbacks %--------------------------------------------------------------------------- -% Sets up a reply queue and consumer within an existing channel -init([BrokerConfig = #rpc_client_state{channel_pid = ChannelPid, ticket = Ticket}]) -> +% Starts a new connection to the broker and opens up a new channel +init([BrokerConfig = #rpc_client{channel_pid = ChannelPid, ticket = Ticket}]) -> State = setup_reply_queue(BrokerConfig), NewState = setup_consumer(State), {ok, NewState}. +% Closes the channel and the broker connection terminate(Reason, State) -> - ok. + amqp_aux:close_channel(State), + amqp_aux:close_connection(State). -handle_call(Payload, From, State) -> - NewState = publish(Payload, From, State), - {noreply, NewState}. +handle_call(manage, From, State = #rpc_client{channel_pid = ChannelPid}) -> + Reply = amqp_channel:call(ChannelPid, []), + {reply, Reply, State}. handle_cast(Msg, State) -> {noreply, State}. -handle_info(#'basic.consume_ok'{consumer_tag = ConsumerTag}, State) -> - {noreply, State}; - -handle_info(#'basic.cancel_ok'{consumer_tag = ConsumerTag}, State) -> - {noreply, State}; - -handle_info({content, ClassId, Properties, PropertiesBin, Payload}, State) -> - io:format("RPC bottom half: ~p~n",[Properties]), - io:format("RPC bottom half: ~p~n",[PropertiesBin]), +handle_info(Msg, State) -> {noreply, State}. code_change(_OldVsn, State, _Extra) -> diff --git a/deps/amqp_client/src/amqp_rpc_handler.erl b/deps/amqp_client/src/amqp_rpc_handler.erl deleted file mode 100644 index da8d147f74..0000000000 --- a/deps/amqp_client/src/amqp_rpc_handler.erl +++ /dev/null @@ -1,45 +0,0 @@ --module(amqp_rpc_handler). - --behaviour(gen_event). - --include_lib("rabbitmq_server/include/rabbit.hrl"). --include_lib("rabbitmq_server/include/rabbit_framing.hrl"). --include("amqp_client.hrl"). --export([init/1, handle_info/2, terminate/2]). - -%--------------------------------------------------------------------------- -% gen_event callbacks -%--------------------------------------------------------------------------- - -init([BrokerConfig]) -> - {ok, BrokerConfig}. - -handle_info(shutdown, State) -> - {remove_handler, State}; - -handle_info(#'basic.consume_ok'{consumer_tag = ConsumerTag}, State) -> - {ok, State}; - -handle_info(#'basic.cancel_ok'{consumer_tag = ConsumerTag}, State) -> - {ok, State}; - -handle_info({content, ClassId, Properties, PropertiesBin, Payload}, - State = #rpc_client_state{channel_pid = ChannelPid, ticket = Ticket, - exchange = X}) -> - Props = #'P_basic'{correlation_id = CorrelationId, reply_to = Q} - = rabbit_framing:decode_properties(ClassId, PropertiesBin), - io:format("------>RPC handler corr id: ~p~n", [CorrelationId]), - - BasicPublish = #'basic.publish'{ticket = Ticket, exchange = X, - routing_key = Q, - mandatory = false, immediate = false}, - ReplyProps = #'P_basic'{correlation_id = CorrelationId}, - Content = #content{class_id = 60, %% TODO HARDCODED VALUE - properties = ReplyProps, properties_bin = 'none', - payload_fragments_rev = [<<"f00bar">>]}, - amqp_channel:cast(ChannelPid, BasicPublish, Content), - - {ok, State}. - -terminate(Args, State) -> - ok. diff --git a/deps/amqp_client/test/amqp_test_util.erl b/deps/amqp_client/src/amqp_test_util.erl similarity index 87% rename from deps/amqp_client/test/amqp_test_util.erl rename to deps/amqp_client/src/amqp_test_util.erl index 899df44a64..5ff94fb39b 100644 --- a/deps/amqp_client/test/amqp_test_util.erl +++ b/deps/amqp_client/src/amqp_test_util.erl @@ -3,7 +3,6 @@ -include_lib("rabbitmq_server/include/rabbit.hrl"). -include_lib("rabbitmq_server/include/rabbit_framing.hrl"). -include_lib("eunit/include/eunit.hrl"). --include("amqp_client.hrl"). -compile([export_all]). @@ -104,29 +103,6 @@ basic_consume_test(Connection) -> end, teardown(Connection, Channel). -rpc_client_test(Connection) -> - Realm = <<"/data">>, - Q = <<"a.b.c">>, - X = <<"x">>, - BindKey = <<"a.b.c.*">>, - RoutingKey = <<"a.b.c.d">>, - {ChannelPid, Ticket} = setup_channel(Connection, Realm), - {ok, Consumer} = gen_event:start_link(), - gen_event:add_handler(Consumer, amqp_rpc_handler , [] ), - BasicConsume = #'basic.consume'{ticket = Ticket, queue = Q, - consumer_tag = <<"">>, - no_local = false, no_ack = true, exclusive = false, nowait = false}, - #'basic.consume_ok'{consumer_tag = ConsumerTag} = amqp_channel:call(ChannelPid, BasicConsume, Consumer), - RpcClientState = #rpc_client_state{channel_pid = ChannelPid, ticket = Ticket, - exchange = X, routing_key = RoutingKey, - queue = Q}, - io:format("Before rpc client start~n"), - RpcClientPid = amqp_rpc_client:start(RpcClientState), - io:format("Before rpc client call~n"), - Reply = amqp_rpc_client:call(RpcClientPid, <<"foo">>), - io:format("Reply from RPC was ~p~n", [Reply]), - teardown(Connection, ChannelPid). - setup_publish(Connection) -> Realm = <<"/data">>, Q = <<"a.b.c">>, diff --git a/deps/amqp_client/src/amqp_util.erl b/deps/amqp_client/src/amqp_util.erl index 6145f54702..101bb39f03 100644 --- a/deps/amqp_client/src/amqp_util.erl +++ b/deps/amqp_client/src/amqp_util.erl @@ -24,7 +24,11 @@ message_payload(Message) -> (Message#basic_message.content)#content.payload_fragments_rev. decode_method(Method, Content) -> + Reply = case rabbit_framing_channel:finish_reading_method(Method,Content) of - {ok, Method2, none} -> Method2; - {ok, Method2, Content2} -> {Method2, Content2} - end. + {ok, _Method, none} -> + _Method; + {ok, _Method, _Content} -> + {_Method,_Content} + end, + Reply.