Bug 17118: Add test coverage checking. Needed to move all the test code into
src/ to do this as the cover library only looks for source in src/.
This commit is contained in:
parent
635736c19b
commit
267f95b59c
|
@ -1,13 +1,11 @@
|
|||
EBIN_DIR=ebin
|
||||
SOURCE_DIR=src
|
||||
TEST_SOURCE_DIR=test
|
||||
INCLUDE_DIR=include
|
||||
ERLC_FLAGS=-W0
|
||||
|
||||
compile:
|
||||
mkdir -p $(EBIN_DIR)
|
||||
erlc -I $(INCLUDE_DIR) -o $(EBIN_DIR) $(ERLC_FLAGS) $(SOURCE_DIR)/*.erl
|
||||
erlc -I $(INCLUDE_DIR) -o $(EBIN_DIR) $(ERLC_FLAGS) $(TEST_SOURCE_DIR)/*.erl
|
||||
erlc +debug_info -I $(INCLUDE_DIR) -o $(EBIN_DIR) $(ERLC_FLAGS) $(SOURCE_DIR)/*.erl
|
||||
|
||||
all: compile
|
||||
|
||||
|
|
|
@ -18,11 +18,4 @@
|
|||
closing = false,
|
||||
consumers = dict:new()} ).
|
||||
|
||||
-record(rpc_client_state, {channel_pid,
|
||||
ticket,
|
||||
exchange,
|
||||
routing_key,
|
||||
queue,
|
||||
consumer_tag,
|
||||
continuations = dict:new(),
|
||||
correlation_id = 0}).
|
||||
-record(rpc_client, {channel_pid, ticket, exchange, routing_key, queue, consumer_tag}).
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
-module(amqp_direct_client_test).
|
||||
|
||||
-export([test_coverage/0]).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
basic_get_test() ->
|
||||
|
@ -17,3 +19,8 @@ lifecycle_test() ->
|
|||
basic_ack_test() ->
|
||||
Connection = amqp_connection:start("guest", "guest"),
|
||||
amqp_test_util:basic_ack_test(Connection).
|
||||
|
||||
test_coverage() ->
|
||||
rabbit_misc:enable_cover(),
|
||||
test(),
|
||||
rabbit_misc:report_cover().
|
|
@ -1,15 +1,15 @@
|
|||
-module(amqp_rpc_util).
|
||||
-module(amqp_method_util).
|
||||
|
||||
-include_lib("rabbitmq_server/include/rabbit_framing.hrl").
|
||||
-include("amqp_client.hrl").
|
||||
-include_lib("rabbitmq_server/include/rabbit_framing.hrl").
|
||||
|
||||
-export([register_consumer/2]).
|
||||
|
||||
% Registers a consumer in this channel
|
||||
register_consumer(RpcClientState = #rpc_client_state{channel_pid = ChannelPid, ticket = Ticket, queue = Q}, Consumer) ->
|
||||
register_consumer(RpcClientState = #rpc_client{channel_pid = ChannelPid, ticket = Ticket, queue = Q}, Consumer) ->
|
||||
Tag = <<"">>,
|
||||
BasicConsume = #'basic.consume'{ticket = Ticket, queue = Q,
|
||||
consumer_tag = Tag,
|
||||
no_local = false, no_ack = true, exclusive = false, nowait = false},
|
||||
#'basic.consume_ok'{consumer_tag = ConsumerTag} = amqp_channel:call(ChannelPid, BasicConsume, Consumer),
|
||||
RpcClientState#rpc_client_state{consumer_tag = ConsumerTag}.
|
||||
RpcClientState#rpc_client{consumer_tag = ConsumerTag}.
|
|
@ -1,5 +1,7 @@
|
|||
-module(amqp_network_client_test).
|
||||
|
||||
-export([test_coverage/0]).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
basic_get_test() ->
|
||||
|
@ -18,6 +20,7 @@ basic_ack_test() ->
|
|||
Connection = amqp_connection:start("guest", "guest", "localhost"),
|
||||
amqp_test_util:basic_ack_test(Connection).
|
||||
|
||||
rpc_client_test() ->
|
||||
Connection = amqp_connection:start("guest", "guest", "localhost"),
|
||||
amqp_test_util:rpc_client_test(Connection).
|
||||
test_coverage() ->
|
||||
rabbit_misc:enable_cover(),
|
||||
test(),
|
||||
rabbit_misc:report_cover().
|
|
@ -153,8 +153,6 @@ writer_loop(Sock) ->
|
|||
{Sender, Method, Content} when is_pid(Sender) ->
|
||||
Channel = resolve_channel(Sender),
|
||||
FrameMax = 131072, %% set to zero once QPid fix their negotiation
|
||||
io:format("1. Trying to send ~p~n",[Method]),
|
||||
io:format("2. Trying to send ~p~n",[Content]),
|
||||
rabbit_writer:internal_send_command(Sock, Channel, Method, Content, FrameMax),
|
||||
writer_loop(Sock);
|
||||
Other ->
|
||||
|
|
|
@ -1,13 +1,11 @@
|
|||
-module(amqp_rpc_client).
|
||||
|
||||
-include_lib("rabbitmq_server/include/rabbit_framing.hrl").
|
||||
-include_lib("rabbitmq_server/include/rabbit.hrl").
|
||||
-include("amqp_client.hrl").
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-export([start/1]).
|
||||
-export([call/2]).
|
||||
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
|
||||
|
||||
%---------------------------------------------------------------------------
|
||||
|
@ -15,19 +13,15 @@
|
|||
%---------------------------------------------------------------------------
|
||||
|
||||
start(BrokerConfig) ->
|
||||
{ok, RpcClientPid} = gen_server:start(?MODULE, [BrokerConfig], []),
|
||||
RpcClientPid.
|
||||
|
||||
call(RpcClientPid, Payload) ->
|
||||
gen_server:call(RpcClientPid, Payload).
|
||||
gen_server:start(?MODULE, [BrokerConfig], []).
|
||||
|
||||
%---------------------------------------------------------------------------
|
||||
% Plumbing
|
||||
%---------------------------------------------------------------------------
|
||||
|
||||
% Sets up a reply queue for this client to listen on
|
||||
setup_reply_queue(State = #rpc_client_state{channel_pid = ChannelPid, ticket = Ticket}) ->
|
||||
QueueDeclare = #'queue.declare'{ticket = Ticket, queue = <<>>,
|
||||
setup_reply_queue(State = #rpc_client{channel_pid = ChannelPid, ticket = Ticket}) ->
|
||||
QueueDeclare = #'queue.declare'{ticket = Ticket, queue = [],
|
||||
passive = false, durable = false,
|
||||
exclusive = false, auto_delete = false,
|
||||
nowait = false, arguments = []},
|
||||
|
@ -35,62 +29,36 @@ setup_reply_queue(State = #rpc_client_state{channel_pid = ChannelPid, ticket = T
|
|||
message_count = MessageCount,
|
||||
consumer_count = ConsumerCount}
|
||||
= amqp_channel:call(ChannelPid, QueueDeclare),
|
||||
State#rpc_client_state{queue = Q}.
|
||||
State#rpc_client{queue = Q}.
|
||||
|
||||
% Sets up a consumer to handle rpc responses
|
||||
setup_consumer(State) ->
|
||||
ConsumerTag = amqp_rpc_util:register_consumer(State, self()),
|
||||
State#rpc_client_state{consumer_tag = ConsumerTag}.
|
||||
|
||||
% Publishes to the broker, stores the From address against
|
||||
% the correlation id and increments the correlationid for
|
||||
% the next request
|
||||
publish(Payload, From,
|
||||
State = #rpc_client_state{channel_pid = ChannelPid, ticket = Ticket,
|
||||
exchange = X, routing_key = RoutingKey,
|
||||
queue = Q, correlation_id = CorrelationId,
|
||||
continuations = Continuations}) ->
|
||||
BasicPublish = #'basic.publish'{ticket = Ticket, exchange = X,
|
||||
routing_key = RoutingKey,
|
||||
mandatory = false, immediate = false},
|
||||
Props = #'P_basic'{correlation_id = list_to_binary(integer_to_list(CorrelationId)), reply_to = Q},
|
||||
Content = #content{class_id = 60, %% TODO HARDCODED VALUE
|
||||
properties = Props, properties_bin = 'none',
|
||||
payload_fragments_rev = [Payload]},
|
||||
io:format("RPC publish 1 q -> ~p~n",[CorrelationId]),
|
||||
amqp_channel:cast(ChannelPid, BasicPublish, Content),
|
||||
NewContinuations = dict:store(CorrelationId, From , Continuations),
|
||||
State#rpc_client_state{correlation_id = CorrelationId + 1, continuations = NewContinuations}.
|
||||
ConsumerTag = amqp_method_util:register_consumer(State, self()),
|
||||
State#rpc_client{consumer_tag = ConsumerTag}.
|
||||
|
||||
%---------------------------------------------------------------------------
|
||||
% gen_server callbacks
|
||||
%---------------------------------------------------------------------------
|
||||
|
||||
% Sets up a reply queue and consumer within an existing channel
|
||||
init([BrokerConfig = #rpc_client_state{channel_pid = ChannelPid, ticket = Ticket}]) ->
|
||||
% Starts a new connection to the broker and opens up a new channel
|
||||
init([BrokerConfig = #rpc_client{channel_pid = ChannelPid, ticket = Ticket}]) ->
|
||||
State = setup_reply_queue(BrokerConfig),
|
||||
NewState = setup_consumer(State),
|
||||
{ok, NewState}.
|
||||
|
||||
% Closes the channel and the broker connection
|
||||
terminate(Reason, State) ->
|
||||
ok.
|
||||
amqp_aux:close_channel(State),
|
||||
amqp_aux:close_connection(State).
|
||||
|
||||
handle_call(Payload, From, State) ->
|
||||
NewState = publish(Payload, From, State),
|
||||
{noreply, NewState}.
|
||||
handle_call(manage, From, State = #rpc_client{channel_pid = ChannelPid}) ->
|
||||
Reply = amqp_channel:call(ChannelPid, []),
|
||||
{reply, Reply, State}.
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(#'basic.consume_ok'{consumer_tag = ConsumerTag}, State) ->
|
||||
{noreply, State};
|
||||
|
||||
handle_info(#'basic.cancel_ok'{consumer_tag = ConsumerTag}, State) ->
|
||||
{noreply, State};
|
||||
|
||||
handle_info({content, ClassId, Properties, PropertiesBin, Payload}, State) ->
|
||||
io:format("RPC bottom half: ~p~n",[Properties]),
|
||||
io:format("RPC bottom half: ~p~n",[PropertiesBin]),
|
||||
handle_info(Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
|
|
|
@ -1,45 +0,0 @@
|
|||
-module(amqp_rpc_handler).
|
||||
|
||||
-behaviour(gen_event).
|
||||
|
||||
-include_lib("rabbitmq_server/include/rabbit.hrl").
|
||||
-include_lib("rabbitmq_server/include/rabbit_framing.hrl").
|
||||
-include("amqp_client.hrl").
|
||||
-export([init/1, handle_info/2, terminate/2]).
|
||||
|
||||
%---------------------------------------------------------------------------
|
||||
% gen_event callbacks
|
||||
%---------------------------------------------------------------------------
|
||||
|
||||
init([BrokerConfig]) ->
|
||||
{ok, BrokerConfig}.
|
||||
|
||||
handle_info(shutdown, State) ->
|
||||
{remove_handler, State};
|
||||
|
||||
handle_info(#'basic.consume_ok'{consumer_tag = ConsumerTag}, State) ->
|
||||
{ok, State};
|
||||
|
||||
handle_info(#'basic.cancel_ok'{consumer_tag = ConsumerTag}, State) ->
|
||||
{ok, State};
|
||||
|
||||
handle_info({content, ClassId, Properties, PropertiesBin, Payload},
|
||||
State = #rpc_client_state{channel_pid = ChannelPid, ticket = Ticket,
|
||||
exchange = X}) ->
|
||||
Props = #'P_basic'{correlation_id = CorrelationId, reply_to = Q}
|
||||
= rabbit_framing:decode_properties(ClassId, PropertiesBin),
|
||||
io:format("------>RPC handler corr id: ~p~n", [CorrelationId]),
|
||||
|
||||
BasicPublish = #'basic.publish'{ticket = Ticket, exchange = X,
|
||||
routing_key = Q,
|
||||
mandatory = false, immediate = false},
|
||||
ReplyProps = #'P_basic'{correlation_id = CorrelationId},
|
||||
Content = #content{class_id = 60, %% TODO HARDCODED VALUE
|
||||
properties = ReplyProps, properties_bin = 'none',
|
||||
payload_fragments_rev = [<<"f00bar">>]},
|
||||
amqp_channel:cast(ChannelPid, BasicPublish, Content),
|
||||
|
||||
{ok, State}.
|
||||
|
||||
terminate(Args, State) ->
|
||||
ok.
|
|
@ -3,7 +3,6 @@
|
|||
-include_lib("rabbitmq_server/include/rabbit.hrl").
|
||||
-include_lib("rabbitmq_server/include/rabbit_framing.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include("amqp_client.hrl").
|
||||
|
||||
-compile([export_all]).
|
||||
|
||||
|
@ -104,29 +103,6 @@ basic_consume_test(Connection) ->
|
|||
end,
|
||||
teardown(Connection, Channel).
|
||||
|
||||
rpc_client_test(Connection) ->
|
||||
Realm = <<"/data">>,
|
||||
Q = <<"a.b.c">>,
|
||||
X = <<"x">>,
|
||||
BindKey = <<"a.b.c.*">>,
|
||||
RoutingKey = <<"a.b.c.d">>,
|
||||
{ChannelPid, Ticket} = setup_channel(Connection, Realm),
|
||||
{ok, Consumer} = gen_event:start_link(),
|
||||
gen_event:add_handler(Consumer, amqp_rpc_handler , [] ),
|
||||
BasicConsume = #'basic.consume'{ticket = Ticket, queue = Q,
|
||||
consumer_tag = <<"">>,
|
||||
no_local = false, no_ack = true, exclusive = false, nowait = false},
|
||||
#'basic.consume_ok'{consumer_tag = ConsumerTag} = amqp_channel:call(ChannelPid, BasicConsume, Consumer),
|
||||
RpcClientState = #rpc_client_state{channel_pid = ChannelPid, ticket = Ticket,
|
||||
exchange = X, routing_key = RoutingKey,
|
||||
queue = Q},
|
||||
io:format("Before rpc client start~n"),
|
||||
RpcClientPid = amqp_rpc_client:start(RpcClientState),
|
||||
io:format("Before rpc client call~n"),
|
||||
Reply = amqp_rpc_client:call(RpcClientPid, <<"foo">>),
|
||||
io:format("Reply from RPC was ~p~n", [Reply]),
|
||||
teardown(Connection, ChannelPid).
|
||||
|
||||
setup_publish(Connection) ->
|
||||
Realm = <<"/data">>,
|
||||
Q = <<"a.b.c">>,
|
|
@ -24,7 +24,11 @@ message_payload(Message) ->
|
|||
(Message#basic_message.content)#content.payload_fragments_rev.
|
||||
|
||||
decode_method(Method, Content) ->
|
||||
Reply =
|
||||
case rabbit_framing_channel:finish_reading_method(Method,Content) of
|
||||
{ok, Method2, none} -> Method2;
|
||||
{ok, Method2, Content2} -> {Method2, Content2}
|
||||
end.
|
||||
{ok, _Method, none} ->
|
||||
_Method;
|
||||
{ok, _Method, _Content} ->
|
||||
{_Method,_Content}
|
||||
end,
|
||||
Reply.
|
||||
|
|
Loading…
Reference in New Issue