Merging bug 21824 onto default

This commit is contained in:
Matthew Sackman 2010-05-27 16:03:50 +01:00
commit b158b5c7f1
2 changed files with 19 additions and 8 deletions

View File

@ -74,9 +74,9 @@ start_channel_infrastructure(network, ChannelNumber, {Sock, MainReader}) ->
end,
{FramingPid, WriterPid};
start_channel_infrastructure(
direct, ChannelNumber, #amqp_params{username = User,
virtual_host = VHost}) ->
Peer = rabbit_channel:start_link(ChannelNumber, self(), self(), User, VHost),
direct, ChannelNumber, {User, VHost, Collector}) ->
Peer = rabbit_channel:start_link(ChannelNumber, self(), self(), User, VHost,
Collector),
{Peer, Peer}.
terminate_channel_infrastructure(network, {FramingPid, WriterPid}) ->

View File

@ -35,7 +35,8 @@
-record(dc_state, {params = #amqp_params{},
closing = false,
server_properties,
channels = amqp_channel_util:new_channel_dict()}).
channels = amqp_channel_util:new_channel_dict(),
queue_collector = none}).
-record(dc_closing, {reason,
close = none, %% At least one of close and reply has to be
@ -61,8 +62,10 @@ init(AmqpParams = #amqp_params{username = User,
rabbit_access_control:check_vhost_access(#user{username = User,
password = Pass},
VHost),
{ok, Collector} = rabbit_reader_queue_collector:start_link(),
ServerProperties = rabbit_reader:server_properties(),
{ok, #dc_state{params = AmqpParams,
queue_collector = Collector,
server_properties = ServerProperties}}.
%% Standard handling of an app initiated command
@ -106,10 +109,14 @@ code_change(_OldVsn, State, _Extra) ->
%%---------------------------------------------------------------------------
handle_command({open_channel, ProposedNumber}, _From,
State = #dc_state{params = Params,
channels = Channels}) ->
State = #dc_state{params = #amqp_params{username = User,
virtual_host = VHost},
channels = Channels,
queue_collector = Collector}) ->
try amqp_channel_util:open_channel(ProposedNumber, ?MAX_CHANNEL_NUMBER,
direct, Params, Channels) of
direct, {User, VHost, Collector},
Channels) of
{ChannelPid, NewChannels} ->
{reply, ChannelPid, State#dc_state{channels = NewChannels}}
catch
@ -182,7 +189,11 @@ set_closing_state(ChannelCloseType, NewClosing,
%% The all_channels_closed_event is called when all channels have been closed
%% after the connection broadcasts a connection_closing message to all channels
all_channels_closed_event(#dc_state{closing = Closing} = State) ->
all_channels_closed_event(#dc_state{closing = Closing,
queue_collector = Collector} = State) ->
rabbit_reader_queue_collector:delete_all(Collector),
rabbit_reader_queue_collector:shutdown(Collector),
rabbit_misc:unlink_and_capture_exit(Collector),
case Closing#dc_closing.from of
none -> ok;
From -> gen_server:reply(From, ok)