merging in from default

This commit is contained in:
Vlad Alexandru Ionescu 2011-03-17 19:25:11 +00:00
commit 084239a7bf
5 changed files with 69 additions and 54 deletions

View File

@ -25,7 +25,7 @@
-behaviour(gen_server). -behaviour(gen_server).
-export([start_link/3, connection_closing/2, open/1]). -export([start_link/3, connection_closing/3, open/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]). handle_info/2]).
-export([call/2, call/3, cast/2, cast/3]). -export([call/2, call/3, cast/2, cast/3]).
@ -45,7 +45,8 @@
rpc_requests = queue:new(), rpc_requests = queue:new(),
anon_sub_requests = queue:new(), anon_sub_requests = queue:new(),
tagged_sub_requests = dict:new(), tagged_sub_requests = dict:new(),
closing = false, closing = false, %% false | just_channel |
%% {connection, Reason}
writer, writer,
return_handler_pid = none, return_handler_pid = none,
confirm_handler_pid = none, confirm_handler_pid = none,
@ -238,8 +239,8 @@ start_link(Driver, ChannelNumber, SWF) ->
gen_server:start_link(?MODULE, [self(), Driver, ChannelNumber, SWF], []). gen_server:start_link(?MODULE, [self(), Driver, ChannelNumber, SWF], []).
%% @private %% @private
connection_closing(Pid, ChannelCloseType) -> connection_closing(Pid, ChannelCloseType, Reason) ->
gen_server:cast(Pid, {connection_closing, ChannelCloseType}). gen_server:cast(Pid, {connection_closing, ChannelCloseType, Reason}).
%% @private %% @private
open(Pid) -> open(Pid) ->
@ -318,14 +319,11 @@ handle_cast({method, Method, Content}, State) ->
%% beforehand. The channel must block all further RPCs, %% beforehand. The channel must block all further RPCs,
%% flush the RPC queue (optional), and terminate %% flush the RPC queue (optional), and terminate
%% @private %% @private
handle_cast({connection_closing, CloseType}, State) -> handle_cast({connection_closing, CloseType, Reason}, State) ->
handle_connection_closing(CloseType, State); handle_connection_closing(CloseType, Reason, State);
%% @private %% @private
handle_cast({shutdown, {_, 200, _}}, State) -> handle_cast({shutdown, Shutdown}, State) ->
{stop, normal, State}; handle_shutdown(Shutdown, State).
%% @private
handle_cast({shutdown, Reason}, State) ->
{stop, Reason, State}.
%% Received from rabbit_channel in the direct case %% Received from rabbit_channel in the direct case
%% @private %% @private
@ -511,9 +509,11 @@ do_rpc(State = #state{rpc_requests = Q,
end; end;
{empty, NewQ} -> {empty, NewQ} ->
case Closing of case Closing of
connection -> gen_server:cast(self(), {connection, Reason} ->
{shutdown, connection_closing}); gen_server:cast(self(),
_ -> ok {shutdown, {connection_closing, Reason}});
_ ->
ok
end, end,
State#state{rpc_requests = NewQ} State#state{rpc_requests = NewQ}
end. end.
@ -643,19 +643,21 @@ handle_method_from_server1(Method, Content, State) ->
%% Other handle_* functions %% Other handle_* functions
%%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
handle_connection_closing(CloseType, State = #state{rpc_requests = RpcQueue, handle_connection_closing(CloseType, Reason,
closing = Closing}) -> State = #state{rpc_requests = RpcQueue,
closing = Closing}) ->
NewState = State#state{closing = {connection, Reason}},
case {CloseType, Closing, queue:is_empty(RpcQueue)} of case {CloseType, Closing, queue:is_empty(RpcQueue)} of
{flush, false, false} -> {flush, false, false} ->
erlang:send_after(?TIMEOUT_FLUSH, self(), erlang:send_after(?TIMEOUT_FLUSH, self(),
timed_out_flushing_channel), timed_out_flushing_channel),
{noreply, State#state{closing = connection}}; {noreply, NewState};
{flush, just_channel, false} -> {flush, just_channel, false} ->
erlang:send_after(?TIMEOUT_CLOSE_OK, self(), erlang:send_after(?TIMEOUT_CLOSE_OK, self(),
timed_out_waiting_close_ok), timed_out_waiting_close_ok),
{noreply, State#state{closing = connection}}; {noreply, NewState};
_ -> _ ->
{stop, connection_closing, State} handle_shutdown({connection_closing, Reason}, NewState)
end. end.
handle_channel_exit(Reason, State) -> handle_channel_exit(Reason, State) ->
@ -665,14 +667,22 @@ handle_channel_exit(Reason, State) ->
?LOG_WARN("Channel (~p) closing: server sent error ~p~n", ?LOG_WARN("Channel (~p) closing: server sent error ~p~n",
[self(), Reason]), [self(), Reason]),
{IsHard, Code, _} = ?PROTOCOL:lookup_amqp_exception(ErrorName), {IsHard, Code, _} = ?PROTOCOL:lookup_amqp_exception(ErrorName),
{stop, {if IsHard -> server_initiated_hard_close; {stop, if IsHard -> {connection_closing,
true -> server_initiated_close {server_initiated_hard_close, Code, Expl}};
end, Code, Expl}, State}; true -> {server_initiated_close, Code, Expl}
end, State};
%% Unexpected death of a channel infrastructure process %% Unexpected death of a channel infrastructure process
_ -> _ ->
{stop, {infrastructure_died, Reason}, State} {stop, {infrastructure_died, Reason}, State}
end. end.
handle_shutdown({_, 200, _}, State) ->
{stop, normal, State};
handle_shutdown({connection_closing, normal}, State) ->
{stop, normal, State};
handle_shutdown(Reason, State) ->
{stop, Reason, State}.
%%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
%% Internal plumbing %% Internal plumbing
%%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
@ -728,7 +738,7 @@ build_content(#amqp_msg{props = Props, payload = Payload}) ->
check_block(_Method, _AmqpMsg, #state{closing = just_channel}) -> check_block(_Method, _AmqpMsg, #state{closing = just_channel}) ->
closing; closing;
check_block(_Method, _AmqpMsg, #state{closing = connection}) -> check_block(_Method, _AmqpMsg, #state{closing = {connection, _}}) ->
closing; closing;
check_block(_Method, none, #state{}) -> check_block(_Method, none, #state{}) ->
ok; ok;

View File

@ -22,7 +22,7 @@
-behaviour(gen_server). -behaviour(gen_server).
-export([start_link/2, open_channel/3, set_channel_max/2, is_empty/1, -export([start_link/2, open_channel/3, set_channel_max/2, is_empty/1,
num_channels/1, pass_frame/3, signal_connection_closing/2]). num_channels/1, pass_frame/3, signal_connection_closing/3]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]). handle_info/2]).
@ -55,8 +55,8 @@ num_channels(ChMgr) ->
pass_frame(ChMgr, ChNumber, Frame) -> pass_frame(ChMgr, ChNumber, Frame) ->
gen_server:cast(ChMgr, {pass_frame, ChNumber, Frame}). gen_server:cast(ChMgr, {pass_frame, ChNumber, Frame}).
signal_connection_closing(ChMgr, ChannelCloseType) -> signal_connection_closing(ChMgr, ChannelCloseType, Reason) ->
gen_server:cast(ChMgr, {connection_closing, ChannelCloseType}). gen_server:cast(ChMgr, {connection_closing, ChannelCloseType, Reason}).
%%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
@ -83,8 +83,8 @@ handle_cast({set_channel_max, ChannelMax}, State) ->
{noreply, State#state{channel_max = ChannelMax}}; {noreply, State#state{channel_max = ChannelMax}};
handle_cast({pass_frame, ChNumber, Frame}, State) -> handle_cast({pass_frame, ChNumber, Frame}, State) ->
{noreply, internal_pass_frame(ChNumber, Frame, State)}; {noreply, internal_pass_frame(ChNumber, Frame, State)};
handle_cast({connection_closing, ChannelCloseType}, State) -> handle_cast({connection_closing, ChannelCloseType, Reason}, State) ->
handle_connection_closing(ChannelCloseType, State). handle_connection_closing(ChannelCloseType, Reason, State).
handle_info({'DOWN', _, process, Pid, Reason}, State) -> handle_info({'DOWN', _, process, Pid, Reason}, State) ->
handle_down(Pid, Reason, State). handle_down(Pid, Reason, State).
@ -159,11 +159,12 @@ maybe_report_down(_Pid, {app_initiated_close, _, _}, _State) ->
ok; ok;
maybe_report_down(_Pid, {server_initiated_close, _, _}, _State) -> maybe_report_down(_Pid, {server_initiated_close, _, _}, _State) ->
ok; ok;
maybe_report_down(_Pid, connection_closing, _State) -> maybe_report_down(Pid, {connection_closing,
ok; {server_initiated_hard_close, _, _} = Reason},
maybe_report_down(Pid, {server_initiated_hard_close, _, _} = Reason,
#state{connection = Connection}) -> #state{connection = Connection}) ->
amqp_gen_connection:hard_error_in_channel(Connection, Pid, Reason); amqp_gen_connection:hard_error_in_channel(Connection, Pid, Reason);
maybe_report_down(_Pid, {connection_closing, _}, _State) ->
ok;
maybe_report_down(_Pid, {server_misbehaved, AmqpError}, maybe_report_down(_Pid, {server_misbehaved, AmqpError},
#state{connection = Connection}) -> #state{connection = Connection}) ->
amqp_gen_connection:server_misbehaved(Connection, AmqpError); amqp_gen_connection:server_misbehaved(Connection, AmqpError);
@ -179,11 +180,12 @@ check_all_channels_terminated(State = #state{closing = true,
false -> ok false -> ok
end. end.
handle_connection_closing(ChannelCloseType, handle_connection_closing(ChannelCloseType, Reason,
State = #state{connection = Connection}) -> State = #state{connection = Connection}) ->
case internal_is_empty(State) of case internal_is_empty(State) of
true -> amqp_gen_connection:channels_terminated(Connection); true -> amqp_gen_connection:channels_terminated(Connection);
false -> signal_channels_connection_closing(ChannelCloseType, State) false -> signal_channels_connection_closing(ChannelCloseType, Reason,
State)
end, end,
{noreply, State#state{closing = true}}. {noreply, State#state{closing = true}}.
@ -233,7 +235,7 @@ internal_lookup_pn(Pid, #state{map_pid_num = MapPN}) ->
internal_update_npa(Number, Pid, AState, State = #state{map_num_pa = MapNPA}) -> internal_update_npa(Number, Pid, AState, State = #state{map_num_pa = MapNPA}) ->
State#state{map_num_pa = gb_trees:update(Number, {Pid, AState}, MapNPA)}. State#state{map_num_pa = gb_trees:update(Number, {Pid, AState}, MapNPA)}.
signal_channels_connection_closing(ChannelCloseType, signal_channels_connection_closing(ChannelCloseType, Reason,
#state{map_pid_num = MapPN}) -> #state{map_pid_num = MapPN}) ->
[amqp_channel:connection_closing(Pid, ChannelCloseType) [amqp_channel:connection_closing(Pid, ChannelCloseType, Reason)
|| Pid <- dict:fetch_keys(MapPN)]. || Pid <- dict:fetch_keys(MapPN)].

View File

@ -295,13 +295,15 @@ server_misbehaved_close(AmqpError, State) ->
set_closing_state(ChannelCloseType, NewClosing, set_closing_state(ChannelCloseType, NewClosing,
State = #state{channels_manager = ChMgr, State = #state{channels_manager = ChMgr,
closing = CurClosing}) -> closing = CurClosing}) ->
amqp_channels_manager:signal_connection_closing(ChMgr, ChannelCloseType),
ResClosing = ResClosing =
case closing_priority(NewClosing) =< closing_priority(CurClosing) of case closing_priority(NewClosing) =< closing_priority(CurClosing) of
true -> NewClosing; true -> NewClosing;
false -> CurClosing false -> CurClosing
end, end,
callback(closing, [ChannelCloseType, closing_to_reason(ResClosing)], ClosingReason = closing_to_reason(ResClosing),
amqp_channels_manager:signal_connection_closing(ChMgr, ChannelCloseType,
ClosingReason),
callback(closing, [ChannelCloseType, ClosingReason],
State#state{closing = ResClosing}). State#state{closing = ResClosing}).
closing_priority(false) -> 99; closing_priority(false) -> 99;

View File

@ -67,23 +67,24 @@ hard_error_test(Connection) ->
try amqp_channel:call(Channel, Qos) of try amqp_channel:call(Channel, Qos) of
_ -> exit(expected_to_exit) _ -> exit(expected_to_exit)
catch catch
exit:{connection_closing, _} -> %% Network case
%% Network case exit:{{connection_closing,
{server_initiated_close, ?NOT_IMPLEMENTED, _}}, _} ->
ok; ok;
exit:Reason -> %% Direct case
%% Direct case exit:{{connection_closing,
%% TODO: fix error code in the direct case {server_initiated_hard_close, ?NOT_IMPLEMENTED, _}}, _} ->
?assertMatch({{server_initiated_hard_close, ?NOT_IMPLEMENTED, _}, _}, ok
Reason)
end, end,
receive {'DOWN', OtherChannelMonitor, process, OtherChannel, OtherExit} -> receive
case OtherExit of %% Direct case
%% Direct case %% TODO fix error code in the direct case
%% TODO fix error code in the direct case {'DOWN', OtherChannelMonitor, process, OtherChannel, killed} ->
killed -> ok; ok;
%% Network case {'DOWN', OtherChannelMonitor, process, OtherChannel, OtherExit} ->
_ -> ?assertMatch(connection_closing, OtherExit) ?assertMatch({connection_closing,
end {server_initiated_close, ?NOT_IMPLEMENTED, _}},
OtherExit)
end, end,
test_util:wait_for_death(Channel), test_util:wait_for_death(Channel),
test_util:wait_for_death(Connection). test_util:wait_for_death(Connection).

View File

@ -279,12 +279,12 @@ channel_multi_open_close_test(Connection) ->
closing -> ok closing -> ok
catch catch
exit:{noproc, _} -> ok; exit:{noproc, _} -> ok;
exit:{connection_closing, _} -> ok exit:{normal, _} -> ok
end; end;
closing -> ok closing -> ok
catch catch
exit:{noproc, _} -> ok; exit:{noproc, _} -> ok;
exit:{connection_closing, _} -> ok exit:{normal, _} -> ok
end end
end) || _ <- lists:seq(1, 50)], end) || _ <- lists:seq(1, 50)],
erlang:yield(), erlang:yield(),