Introduce amqp_connection:register_blocked_handler

For handling of connection.blocked and connection.unblocked
methods. Mimics the channel.flow handling API.

Incomplete, does not yet pass the tests. Pushed
for review.
This commit is contained in:
Michael Klishin 2013-06-26 16:21:40 +04:00
parent ccaec5b11a
commit aefddc0b9c
4 changed files with 76 additions and 5 deletions

View File

@ -69,7 +69,7 @@
-include("amqp_client_internal.hrl").
-export([open_channel/1, open_channel/2, open_channel/3]).
-export([open_channel/1, open_channel/2, open_channel/3, register_blocked_handler/2]).
-export([start/1, close/1, close/2, close/3]).
-export([error_atom/1]).
-export([info/2, info_keys/1, info_keys/0]).
@ -268,6 +268,9 @@ close(ConnectionPid, Code, Text, Timeout) ->
method_id = 0},
amqp_gen_connection:close(ConnectionPid, Close, Timeout).
register_blocked_handler(ConnectionPid, BlockHandler) ->
amqp_gen_connection:register_blocked_handler(ConnectionPid, BlockHandler).
%%---------------------------------------------------------------------------
%% Other functions
%%---------------------------------------------------------------------------

View File

@ -23,7 +23,8 @@
-export([start_link/5, connect/1, open_channel/3, hard_error_in_channel/3,
channel_internal_error/3, server_misbehaved/2, channels_terminated/1,
close/3, server_close/2, info/2, info_keys/0, info_keys/1]).
close/3, server_close/2, info/2, info_keys/0, info_keys/1,
register_blocked_handler/2]).
-export([behaviour_info/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
@ -40,6 +41,8 @@
server_properties,
start_infrastructure_fun,
start_channels_manager_fun,
%% connection.block, connection.unblock handler
block_handler,
closing = false %% #closing{} | false
}).
@ -164,7 +167,8 @@ init([Mod, Sup, AmqpParams, SIF, SChMF, ExtraParams]) ->
sup = Sup,
amqp_params = AmqpParams,
start_infrastructure_fun = SIF,
start_channels_manager_fun = SChMF}}.
start_channels_manager_fun = SChMF,
block_handler = none}}.
handle_call(connect, _From,
State0 = #state{module = Mod,
@ -215,8 +219,17 @@ handle_cast({channel_internal_error, Pid, Reason}, State) ->
handle_cast({server_misbehaved, AmqpError}, State) ->
server_misbehaved_close(AmqpError, State);
handle_cast({server_close, #'connection.close'{} = Close}, State) ->
server_initiated_close(Close, State).
server_initiated_close(Close, State);
handle_cast({register_blocked_handler, HandlerPid}, State) ->
Ref = erlang:monitor(process, HandlerPid),
{noreply, State#state{block_handler = {HandlerPid, Ref}}}.
%% @private
handle_info({'DOWN', _, process, BlockHandler, Reason},
State = #state{block_handler = {BlockHandler, _Ref}}) ->
?LOG_WARN("Connection (~p): Unregistering block handler ~p because it died. "
"Reason: ~p~n", [self(), BlockHandler, Reason]),
{noreply, State#state{block_handler = none}};
handle_info(Info, State) ->
callback(handle_message, [Info], State).
@ -238,6 +251,13 @@ i(num_channels, State) -> amqp_channels_manager:num_channels(
State#state.channels_manager);
i(Item, #state{module = Mod, module_state = MState}) -> Mod:i(Item, MState).
%%---------------------------------------------------------------------------
%% connection.blocked, connection.unblocked
%%---------------------------------------------------------------------------
register_blocked_handler(Pid, HandlerPid) ->
gen_server:cast(Pid, {register_blocked_handler, HandlerPid}).
%%---------------------------------------------------------------------------
%% Command handling
%%---------------------------------------------------------------------------
@ -263,6 +283,16 @@ handle_method(#'connection.close_ok'{}, State = #state{closing = Closing}) ->
#closing{from = From} -> gen_server:reply(From, ok)
end,
{stop, {shutdown, closing_to_reason(Closing)}, State};
handle_method(#'connection.blocked'{} = Blocked, State = #state{block_handler = BlockHandler}) ->
case BlockHandler of none -> ok;
{Pid, _Ref} -> Pid ! Blocked
end,
{noreply, State};
handle_method(#'connection.unblocked'{} = Unblocked, State = #state{block_handler = BlockHandler}) ->
case BlockHandler of none -> ok;
{Pid, _Ref} -> Pid ! Unblocked
end,
{noreply, State};
handle_method(Other, State) ->
server_misbehaved_close(#amqp_error{name = command_invalid,
explanation = "unexpected method on "

View File

@ -72,6 +72,7 @@ confirm_barrier_timeout_test_() -> ?RUN([]).
confirm_barrier_die_timeout_test_() -> ?RUN([]).
default_consumer_test_() -> ?RUN([]).
subscribe_nowait_test_() -> ?RUN([]).
connection_blocked_test_() -> ?RUN([]).
non_existent_exchange_test_() -> ?RUN([negative]).
bogus_rpc_test_() -> ?RUN([negative, repeat]).
@ -121,4 +122,3 @@ test_coverage() ->
rabbit_misc:enable_cover(),
test(),
rabbit_misc:report_cover().

View File

@ -1036,6 +1036,44 @@ rpc_client_consume_loop(Channel) ->
%%---------------------------------------------------------------------------
%% connection.blocked, connection.unblocked
connection_blocked_test() ->
{ok, Connection} = new_connection(),
X = <<"amq.direct">>,
K = Payload = <<"x">>,
memsup:set_sysmem_high_watermark(0.99),
timer:sleep(1000),
{ok, Channel} = amqp_connection:open_channel(Connection),
Parent = self(),
Child = spawn_link(
fun() ->
receive
#'connection.blocked'{} -> ok
end,
Publish = #'basic.publish'{exchange = X,
routing_key = K},
blocked =
amqp_channel:call(Channel, Publish,
#amqp_msg{payload = Payload}),
memsup:set_sysmem_high_watermark(5.0),
receive
#'connection.unblocked'{} -> ok
end,
Parent ! ok
end),
amqp_connection:register_blocked_handler(Channel, Child),
timer:sleep(1000),
memsup:set_sysmem_high_watermark(0.001),
receive
ok -> ok
after 10000 ->
?LOG_DEBUG("Are you sure that you have waited 1 minute?~n"),
exit(did_not_receive_connection_blocked)
end.
%%---------------------------------------------------------------------------
setup_publish(Channel) ->
Publish = #publish{routing_key = <<"a.b.c.d">>,
q = uuid(),