Merged default into 18557

This commit is contained in:
Ben Hood 2008-12-18 16:46:58 +00:00
commit e044b1610c
6 changed files with 289 additions and 15 deletions

View File

@ -118,3 +118,81 @@ following target.
$ make test_direct
Running the channel flow tests
------------------------------
There are two tests for producer control flow. The first is a unit
test that asserts the reception of the correct chain of commands in
conjunction with a direct manipulation of the high water mark. The
second test does not make any assertion about the behavior of the
server but it does produce output that demonstrates that the client
library is indeed notifying higher level application code that flow
control has been activated or deactivated.
Both tests require that the memory alarms are turned on in the
server. By default they are turned off. To turn them on, set the
memory_alarms flag in the rabbit.app config file.
Because the unit test accesses memsup directly, it needs to use the
direct API and hence needs to run in the same VM as the server. To do
this from the rabbitmq-erlang-client directory, run the following
commmand (where SOME_DIRECTORY is some directory where you want mnesia
to log its files):
$ erl -pa ebin rabbitmq_server/ebin -mnesia dir SOME_DIRECTORY \
-boot start_sasl -s rabbit
When that has booted, you need to wait one minute for the memory
alarms to become active. After that, you can run the following from
the Erlang shell:
1> direct_client_test:test_channel_flow().
=INFO REPORT==== 17-Dec-2008::13:39:41 ===
alarm_handler: {set,{system_memory_high_watermark,[]}}
=INFO REPORT==== 17-Dec-2008::13:39:42 ===
alarm_handler: {clear,system_memory_high_watermark}
ok
2>
The non-unit test can be run in separate VM, because it uses the
network client driver. Whilst it can be run using the direct client,
it produces log output which makes it difficult to enter in commands
interactively (which you need to do to see the throttling).
After having booted an instance of the server with alarms handlers
turned on, run the following in the rabbitmq-erlang-client directory:
rabbitmq-erlang-client $ erl -pa rabbitmq_server/ebin/ ebin/
Erlang (BEAM) emulator version 5.6.3 [source] [smp:2] \
[async-threads:0][kernel-poll:false]
Eshell V5.6.3 (abort with ^G)
1> test_util:channel_flow_sync(lib_amqp:start_connection("localhost")).
{<0.39.0>,<0.40.0>}
After having done this, you should see output similar to this:
Producer (<0.39.0>) has sent about 0 messages since it started
Producer (<0.39.0>) has sent about 5000 messages since it started
Producer (<0.39.0>) has sent about 10000 messages since it started
To throttle the producer, go to the server shell and turn the memory
limit to some suitably low value:
2> memsup:set_sysmem_high_watermark(0.01).
ok
Back in the client shell, you should see the following output:
.....
Producer (<0.39.0>) has sent about 235000 messages since it started
Producer throttling ON
Producer (<0.39.0>) is blocked, will go to sleep.....ZZZ
If you now set the high water mark to say 99%, the producer should
wake up again:
Producer throttling OFF, waking up producer (<0.39.0>)
Producer (<0.39.0>) has woken up :-)
Producer (<0.39.0>) has sent about 240000 messages since it started
.....

View File

@ -46,6 +46,8 @@
tagged_sub_requests = dict:new(),
closing = false,
return_handler_pid,
flow_control = false,
flow_handler_pid,
consumers = dict:new()}).
-record(rpc_client_state, {broker_config,

View File

@ -34,8 +34,10 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
-export([call/2, call/3, cast/2, cast/3]).
-export([subscribe/3]).
-export([register_direct_peer/2]).
-export([register_return_handler/2]).
-export([register_flow_handler/2]).
%% This diagram shows the interaction between the different component processes
%% in an AMQP client scenario.
@ -69,21 +71,28 @@
%% Generic AMQP RPC mechanism that expects a pseudo synchronous response
call(Channel, Method) ->
gen_server:call(Channel, {call, Method}).
%% Generic AMQP send mechanism with content
call(Channel, Method, Content) ->
gen_server:call(Channel, {call, Method, Content}).
%% Allows a consumer to be registered with the channel when invoking a BasicConsume
call(Channel, Method = #'basic.consume'{}, Consumer) ->
%% TODO This requires refactoring, because the handle_call callback
%% can perform the differentiation between tuples
gen_server:call(Channel, {basic_consume, Method, Consumer}).
%% Generic AMQP send mechansim that doesn't expect a response
%% Generic AMQP send mechanism that doesn't expect a response
cast(Channel, Method) ->
gen_server:cast(Channel, {cast, Method}).
%% Generic AMQP send mechansim that doesn't expect a response
%% Generic AMQP send mechanism that doesn't expect a response
cast(Channel, Method, Content) ->
gen_server:cast(Channel, {cast, Method, Content}).
%---------------------------------------------------------------------------
% Consumer registration
%---------------------------------------------------------------------------
%% Registers a consumer pid with the channel
subscribe(Channel, BasicConsume = #'basic.consume'{}, Consumer) ->
gen_server:call(Channel, {BasicConsume, Consumer}).
%---------------------------------------------------------------------------
% Direct peer registration
%---------------------------------------------------------------------------
@ -103,6 +112,10 @@ register_direct_peer(Channel, Peer) ->
register_return_handler(Channel, ReturnHandler) ->
gen_server:cast(Channel, {register_return_handler, ReturnHandler} ).
%% Registers a handler to deal with flow control
register_flow_handler(Channel, FlowHandler) ->
gen_server:cast(Channel, {register_flow_handler, FlowHandler} ).
%---------------------------------------------------------------------------
% Internal plumbing
%---------------------------------------------------------------------------
@ -120,13 +133,13 @@ rpc_top_half(Method, From, State = #channel_state{writer_pid = Writer,
ok
end,
{noreply, NewState}.
rpc_bottom_half(#'channel.close'{reply_code = ReplyCode,
reply_text = ReplyText},State) ->
io:format("Channel received close from peer, code: ~p , message: ~p~n",[ReplyCode,ReplyText]),
NewState = channel_cleanup(State),
{stop, normal, NewState};
rpc_bottom_half(Reply, State = #channel_state{writer_pid = Writer,
rpc_requests = RequestQueue,
do2 = Do2}) ->
@ -210,6 +223,19 @@ handle_method(ChannelCloseOk = #'channel.close_ok'{}, State) ->
{noreply, NewState} = rpc_bottom_half(ChannelCloseOk, State),
{stop, normal, NewState};
%% This handles the flow control flag that the broker initiates.
%% If defined, it informs the flow control handler to suspend submitting
%% any content bearing methods
handle_method(Flow = #'channel.flow'{active = Active},
State = #channel_state{writer_pid = Writer, do2 = Do2,
flow_handler_pid = FlowHandler}) ->
case FlowHandler of
undefined -> ok;
_ -> FlowHandler ! Flow
end,
Do2(Writer, #'channel.flow_ok'{active = Active}),
{noreply, State#channel_state{flow_control = not(Active)}};
handle_method(Method, State) ->
rpc_bottom_half(Method, State).
@ -243,9 +269,18 @@ init([InitialState]) ->
handle_call({call, Method}, From, State = #channel_state{closing = false}) ->
rpc_top_half(Method, From, State);
handle_call({call, _Method, _Content}, _From,
State = #channel_state{flow_control = true}) ->
{reply, blocked, State};
handle_call({call, Method, Content}, _From,
State = #channel_state{writer_pid = Writer, do3 = Do3}) ->
Do3(Writer, Method, Content),
{reply, ok, State};
%% Top half of the basic consume process.
%% Sets up the consumer for registration in the bottom half of this RPC.
handle_call({basic_consume, Method = #'basic.consume'{consumer_tag = Tag}, Consumer},
handle_call({Method = #'basic.consume'{consumer_tag = Tag}, Consumer},
From, State = #channel_state{anon_sub_requests = Subs})
when Tag =:= undefined ; size(Tag) == 0 ->
NewSubs = queue:in({From,Consumer}, Subs),
@ -253,7 +288,7 @@ handle_call({basic_consume, Method = #'basic.consume'{consumer_tag = Tag}, Consu
NewMethod = Method#'basic.consume'{consumer_tag = <<"">>},
rpc_top_half(NewMethod, From, NewState);
handle_call({basic_consume, Method = #'basic.consume'{consumer_tag = Tag}, Consumer},
handle_call({Method = #'basic.consume'{consumer_tag = Tag}, Consumer},
From, State = #channel_state{tagged_sub_requests = Subs})
when is_binary(Tag) ->
% TODO test whether this tag already exists, either in the pending tagged
@ -267,8 +302,17 @@ handle_cast({cast, Method}, State = #channel_state{writer_pid = Writer, do2 = Do
Do2(Writer, Method),
{noreply, State};
%% This discards any message submitted to the channel when flow control is
%% active
handle_cast({cast, Method, _Content},
State = #channel_state{flow_control = true}) ->
% Discard the message and log it
io:format("Discarding content bearing method (~p) ~n", [Method]),
{noreply, State};
%% Standard implementation of the cast/3 command
handle_cast({cast, Method, Content}, State = #channel_state{writer_pid = Writer, do3 = Do3}) ->
handle_cast({cast, Method, Content},
State = #channel_state{writer_pid = Writer, do3 = Do3}) ->
Do3(Writer, Method, Content),
{noreply, State};
@ -284,6 +328,11 @@ handle_cast({register_return_handler, ReturnHandler}, State) ->
NewState = State#channel_state{return_handler_pid = ReturnHandler},
{noreply, NewState};
%% Registers a handler to process flow control messages
handle_cast({register_flow_handler, FlowHandler}, State) ->
NewState = State#channel_state{flow_handler_pid = FlowHandler},
{noreply, NewState};
handle_cast({notify_sent, _Peer}, State) ->
{noreply, State}.

View File

@ -29,6 +29,7 @@
-define(RPC_SLEEP, 500).
-export([test_coverage/0]).
-export([test_channel_flow/0]).
-include_lib("eunit/include/eunit.hrl").
@ -47,6 +48,13 @@ lifecycle_test() -> test_util:lifecycle_test(new_connection()).
basic_ack_test() ->test_util:basic_ack_test(new_connection()).
command_serialization_test() -> test_util:command_serialization_test(new_connection()).
%----------------------------------------------------------------------------
% This must be kicked off manually because it can only be run after Rabbit
% has been running for 1 minute
test_channel_flow() ->
test_util:channel_flow_test(new_connection()).
%----------------------------------------------------------------------------
% Negative Tests

View File

@ -35,6 +35,17 @@ publish(Channel, X, RoutingKey, Payload) ->
publish(Channel, X, RoutingKey, Payload, false).
publish(Channel, X, RoutingKey, Payload, Mandatory) ->
publish_internal(fun amqp_channel:call/3,
Channel, X, RoutingKey, Payload, Mandatory).
async_publish(Channel, X, RoutingKey, Payload) ->
async_publish(Channel, X, RoutingKey, Payload, false).
async_publish(Channel, X, RoutingKey, Payload, Mandatory) ->
publish_internal(fun amqp_channel:cast/3,
Channel, X, RoutingKey, Payload, Mandatory).
publish_internal(Fun, Channel, X, RoutingKey, Payload, Mandatory) ->
BasicPublish = #'basic.publish'{exchange = X,
routing_key = RoutingKey,
mandatory = Mandatory, immediate = false},
@ -43,7 +54,7 @@ publish(Channel, X, RoutingKey, Payload, Mandatory) ->
properties = amqp_util:basic_properties(),
properties_bin = none,
payload_fragments_rev = [Payload]},
amqp_channel:cast(Channel, BasicPublish, Content).
Fun(Channel, BasicPublish, Content).
close_channel(Channel) ->
ChannelClose = #'channel.close'{reply_code = 200, reply_text = <<"Goodbye">>,
@ -95,7 +106,8 @@ subscribe(Channel, Q, Consumer, Tag, NoAck) ->
consumer_tag = Tag,
no_local = false, no_ack = NoAck,
exclusive = false, nowait = false},
#'basic.consume_ok'{consumer_tag = ConsumerTag} = amqp_channel:call(Channel,BasicConsume, Consumer),
#'basic.consume_ok'{consumer_tag = ConsumerTag} =
amqp_channel:subscribe(Channel,BasicConsume, Consumer),
ConsumerTag.
unsubscribe(Channel, Tag) ->
@ -103,6 +115,9 @@ unsubscribe(Channel, Tag) ->
#'basic.cancel_ok'{} = amqp_channel:call(Channel,BasicCancel),
ok.
declare_queue(Channel) ->
declare_queue(Channel, <<>>).
declare_queue(Channel, Q) ->
QueueDeclare = #'queue.declare'{queue = Q,
passive = false, durable = false,

View File

@ -266,6 +266,128 @@ producer_loop(Channel, RoutingKey, N) ->
basic_reject_test(Connection) ->
lib_amqp:close_connection(Connection).
%----------------------------------------------------------------------------
% Unit test for the direct client
% This just relies on the fact that a fresh Rabbit VM must consume more than
% 0.1 pc of the system memory:
% 0. Wait 1 minute to let memsup do stuff
% 1. Make sure that the high watermark is set high
% 2. Start a process to receive the pause and resume commands from the broker
% 3. Register this as flow control notification handler
% 4. Let the system settle for a little bit
% 5. Set the threshold to the lowest possible value
% 6. When the flow handler receives the pause command, it sets the watermark
% to a high value in order to get the broker to send the resume command
% 7. Allow 10 secs to receive the pause and resume, otherwise timeout and fail
channel_flow_test(Connection) ->
X = <<"amq.direct">>,
K = Payload = <<"x">>,
memsup:set_sysmem_high_watermark(0.99),
timer:sleep(1000),
Channel = lib_amqp:start_channel(Connection),
Parent = self(),
Child = spawn_link(
fun() ->
receive
#'channel.flow'{active = false} -> ok
end,
blocked = lib_amqp:publish(Channel, X, K, Payload),
memsup:set_sysmem_high_watermark(0.99),
receive
#'channel.flow'{active = true} -> ok
end,
Parent ! ok
end),
amqp_channel:register_flow_handler(Channel, Child),
timer:sleep(1000),
memsup:set_sysmem_high_watermark(0.001),
receive
ok -> ok
after 10000 ->
io:format("Are you sure that you have waited 1 minute?~n"),
exit(did_not_receive_channel_flow)
end.
%----------------------------------------------------------------------------
% This is a test, albeit not a unit test, to see if the producer
% handles the effect of being throttled.
channel_flow_sync(Connection) ->
start_channel_flow(Connection, fun lib_amqp:publish/4).
channel_flow_async(Connection) ->
start_channel_flow(Connection, fun lib_amqp:async_publish/4).
start_channel_flow(Connection, PublishFun) ->
X = <<"amq.direct">>,
Key = uuid(),
Producer = spawn_link(
fun() ->
Channel = lib_amqp:start_channel(Connection),
Parent = self(),
FlowHandler = spawn_link(fun() -> cf_handler_loop(Parent) end),
amqp_channel:register_flow_handler(Channel, FlowHandler),
Payload = << <<B:8>> || B <- lists:seq(1, 10000) >>,
cf_producer_loop(Channel, X, Key, PublishFun, Payload, 0)
end),
Consumer = spawn_link(
fun() ->
Channel = lib_amqp:start_channel(Connection),
Q = lib_amqp:declare_queue(Channel),
lib_amqp:bind_queue(Channel, X, Q, Key),
Tag = lib_amqp:subscribe(Channel, Q, self()),
cf_consumer_loop(Channel, Tag)
end),
{Producer, Consumer}.
cf_consumer_loop(Channel, Tag) ->
receive
#'basic.consume_ok'{} -> cf_consumer_loop(Channel, Tag);
#'basic.cancel_ok'{} -> ok;
{#'basic.deliver'{delivery_tag = DeliveryTag}, _Content} ->
lib_amqp:ack(Channel, DeliveryTag),
cf_consumer_loop(Channel, Tag);
stop ->
lib_amqp:unsubscribe(Channel, Tag),
ok
end.
cf_producer_loop(Channel, X, Key, PublishFun, Payload, N)
when N rem 5000 =:= 0 ->
io:format("Producer (~p) has sent about ~p messages since it started~n",
[self(), N]),
cf_producer_loop(Channel, X, Key, PublishFun, Payload, N + 1);
cf_producer_loop(Channel, X, Key, PublishFun, Payload, N) ->
case PublishFun(Channel, X, Key, Payload) of
blocked ->
io:format("Producer (~p) is blocked, will go to sleep.....ZZZ~n",
[self()]),
receive
resume ->
io:format("Producer (~p) has woken up :-)~n", [self()]),
cf_producer_loop(Channel, X, Key,
PublishFun, Payload, N + 1)
end;
_ ->
cf_producer_loop(Channel, X, Key, PublishFun, Payload, N + 1)
end.
cf_handler_loop(Producer) ->
receive
#'channel.flow'{active = false} ->
io:format("Producer throttling ON~n"),
cf_handler_loop(Producer);
#'channel.flow'{active = true} ->
io:format("Producer throttling OFF, waking up producer (~p)~n",
[Producer]),
Producer ! resume,
cf_handler_loop(Producer);
stop -> ok
end.
%----------------------------------------------------------------------------
setup_publish(Channel) ->
Publish = #publish{routing_key = <<"a.b.c.d">>,
q = <<"a.b.c">>,