Fix up direct connections to use the queue collector process to stay compatible with the server.
This commit is contained in:
parent
10660f2c70
commit
dd1945956b
|
|
@ -74,9 +74,9 @@ start_channel_infrastructure(network, ChannelNumber, {Sock, MainReader}) ->
|
||||||
end,
|
end,
|
||||||
{FramingPid, WriterPid};
|
{FramingPid, WriterPid};
|
||||||
start_channel_infrastructure(
|
start_channel_infrastructure(
|
||||||
direct, ChannelNumber, #amqp_params{username = User,
|
direct, ChannelNumber, {User, VHost, Collector}) ->
|
||||||
virtual_host = VHost}) ->
|
Peer = rabbit_channel:start_link(ChannelNumber, self(), self(), User, VHost,
|
||||||
Peer = rabbit_channel:start_link(ChannelNumber, self(), self(), User, VHost),
|
Collector),
|
||||||
{Peer, Peer}.
|
{Peer, Peer}.
|
||||||
|
|
||||||
terminate_channel_infrastructure(network, {FramingPid, WriterPid}) ->
|
terminate_channel_infrastructure(network, {FramingPid, WriterPid}) ->
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,8 @@
|
||||||
-record(dc_state, {params = #amqp_params{},
|
-record(dc_state, {params = #amqp_params{},
|
||||||
closing = false,
|
closing = false,
|
||||||
server_properties,
|
server_properties,
|
||||||
channels = amqp_channel_util:new_channel_dict()}).
|
channels = amqp_channel_util:new_channel_dict(),
|
||||||
|
queue_collector = none}).
|
||||||
|
|
||||||
-record(dc_closing, {reason,
|
-record(dc_closing, {reason,
|
||||||
close = none, %% At least one of close and reply has to be
|
close = none, %% At least one of close and reply has to be
|
||||||
|
|
@ -61,8 +62,10 @@ init(AmqpParams = #amqp_params{username = User,
|
||||||
rabbit_access_control:check_vhost_access(#user{username = User,
|
rabbit_access_control:check_vhost_access(#user{username = User,
|
||||||
password = Pass},
|
password = Pass},
|
||||||
VHost),
|
VHost),
|
||||||
|
{ok, Collector} = rabbit_reader_queue_collector:start_link(),
|
||||||
ServerProperties = rabbit_reader:server_properties(),
|
ServerProperties = rabbit_reader:server_properties(),
|
||||||
{ok, #dc_state{params = AmqpParams,
|
{ok, #dc_state{params = AmqpParams,
|
||||||
|
queue_collector = Collector,
|
||||||
server_properties = ServerProperties}}.
|
server_properties = ServerProperties}}.
|
||||||
|
|
||||||
%% Standard handling of an app initiated command
|
%% Standard handling of an app initiated command
|
||||||
|
|
@ -106,10 +109,14 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%---------------------------------------------------------------------------
|
%%---------------------------------------------------------------------------
|
||||||
|
|
||||||
handle_command({open_channel, ProposedNumber}, _From,
|
handle_command({open_channel, ProposedNumber}, _From,
|
||||||
State = #dc_state{params = Params,
|
State = #dc_state{params = #amqp_params{username = User,
|
||||||
channels = Channels}) ->
|
virtual_host=VHost},
|
||||||
|
channels = Channels,
|
||||||
|
queue_collector = Collector}) ->
|
||||||
|
|
||||||
try amqp_channel_util:open_channel(ProposedNumber, ?MAX_CHANNEL_NUMBER,
|
try amqp_channel_util:open_channel(ProposedNumber, ?MAX_CHANNEL_NUMBER,
|
||||||
direct, Params, Channels) of
|
direct, {User, VHost, Collector},
|
||||||
|
Channels) of
|
||||||
{ChannelPid, NewChannels} ->
|
{ChannelPid, NewChannels} ->
|
||||||
{reply, ChannelPid, State#dc_state{channels = NewChannels}}
|
{reply, ChannelPid, State#dc_state{channels = NewChannels}}
|
||||||
catch
|
catch
|
||||||
|
|
@ -182,7 +189,11 @@ set_closing_state(ChannelCloseType, NewClosing,
|
||||||
|
|
||||||
%% The all_channels_closed_event is called when all channels have been closed
|
%% The all_channels_closed_event is called when all channels have been closed
|
||||||
%% after the connection broadcasts a connection_closing message to all channels
|
%% after the connection broadcasts a connection_closing message to all channels
|
||||||
all_channels_closed_event(#dc_state{closing = Closing} = State) ->
|
all_channels_closed_event(#dc_state{closing = Closing,
|
||||||
|
queue_collector = Collector} = State) ->
|
||||||
|
rabbit_reader_queue_collector:delete_all(Collector),
|
||||||
|
rabbit_reader_queue_collector:shutdown(Collector),
|
||||||
|
rabbit_misc:unlink_and_capture_exit(Collector),
|
||||||
case Closing#dc_closing.from of
|
case Closing#dc_closing.from of
|
||||||
none -> ok;
|
none -> ok;
|
||||||
From -> gen_server:reply(From, ok)
|
From -> gen_server:reply(From, ok)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue