diff --git a/deps/amqp_client/src/amqp_channel_util.erl b/deps/amqp_client/src/amqp_channel_util.erl index af30a064ad..bc8a2a34f1 100755 --- a/deps/amqp_client/src/amqp_channel_util.erl +++ b/deps/amqp_client/src/amqp_channel_util.erl @@ -74,9 +74,9 @@ start_channel_infrastructure(network, ChannelNumber, {Sock, MainReader}) -> end, {FramingPid, WriterPid}; start_channel_infrastructure( - direct, ChannelNumber, #amqp_params{username = User, - virtual_host = VHost}) -> - Peer = rabbit_channel:start_link(ChannelNumber, self(), self(), User, VHost), + direct, ChannelNumber, {User, VHost, Collector}) -> + Peer = rabbit_channel:start_link(ChannelNumber, self(), self(), User, VHost, + Collector), {Peer, Peer}. terminate_channel_infrastructure(network, {FramingPid, WriterPid}) -> diff --git a/deps/amqp_client/src/amqp_direct_connection.erl b/deps/amqp_client/src/amqp_direct_connection.erl index aee6574486..e0a328f4fb 100755 --- a/deps/amqp_client/src/amqp_direct_connection.erl +++ b/deps/amqp_client/src/amqp_direct_connection.erl @@ -35,7 +35,8 @@ -record(dc_state, {params = #amqp_params{}, closing = false, server_properties, - channels = amqp_channel_util:new_channel_dict()}). + channels = amqp_channel_util:new_channel_dict(), + queue_collector = none}). -record(dc_closing, {reason, close = none, %% At least one of close and reply has to be @@ -61,8 +62,10 @@ init(AmqpParams = #amqp_params{username = User, rabbit_access_control:check_vhost_access(#user{username = User, password = Pass}, VHost), + {ok, Collector} = rabbit_reader_queue_collector:start_link(), ServerProperties = rabbit_reader:server_properties(), {ok, #dc_state{params = AmqpParams, + queue_collector = Collector, server_properties = ServerProperties}}. %% Standard handling of an app initiated command @@ -106,10 +109,14 @@ code_change(_OldVsn, State, _Extra) -> %%--------------------------------------------------------------------------- handle_command({open_channel, ProposedNumber}, _From, - State = #dc_state{params = Params, - channels = Channels}) -> + State = #dc_state{params = #amqp_params{username = User, + virtual_host = VHost}, + channels = Channels, + queue_collector = Collector}) -> + try amqp_channel_util:open_channel(ProposedNumber, ?MAX_CHANNEL_NUMBER, - direct, Params, Channels) of + direct, {User, VHost, Collector}, + Channels) of {ChannelPid, NewChannels} -> {reply, ChannelPid, State#dc_state{channels = NewChannels}} catch @@ -182,7 +189,11 @@ set_closing_state(ChannelCloseType, NewClosing, %% The all_channels_closed_event is called when all channels have been closed %% after the connection broadcasts a connection_closing message to all channels -all_channels_closed_event(#dc_state{closing = Closing} = State) -> +all_channels_closed_event(#dc_state{closing = Closing, + queue_collector = Collector} = State) -> + rabbit_reader_queue_collector:delete_all(Collector), + rabbit_reader_queue_collector:shutdown(Collector), + rabbit_misc:unlink_and_capture_exit(Collector), case Closing#dc_closing.from of none -> ok; From -> gen_server:reply(From, ok)