Introduce amqp_msg.channel_pid

The channel will be sending notifications rabbit_amqqueue:send_notify/2.
Consumers need to know the pid to tell it to do so.
This commit is contained in:
Michael Klishin 2014-08-04 10:44:08 +04:00
parent 6d0f806ba9
commit e1aed55e2a
2 changed files with 33 additions and 9 deletions

View File

@ -20,7 +20,15 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").
-record(amqp_msg, {props = #'P_basic'{}, payload = <<>>, ref = make_ref()}).
%% ref and channel_pid are relevant in the context
%% of direct client
-record(amqp_msg, {props = #'P_basic'{},
payload = <<>>,
%% helps identify the message
ref = make_ref(),
%% pid of the amqp_channel this message
%% was delivered on
channel_pid}).
-record(amqp_params_network, {username = <<"guest">>,
password = <<"guest">>,

View File

@ -634,7 +634,8 @@ pre_do(_, _, _, State) ->
%% Handling of methods from the server
%%---------------------------------------------------------------------------
handle_method_from_server(Method, Content, Ref, State = #state{closing = Closing}) ->
handle_method_from_server(Method, Content, Ref,
State = #state{closing = Closing}) ->
case is_connection_method(Method) of
true -> server_misbehaved(
#amqp_error{name = command_invalid,
@ -652,8 +653,10 @@ handle_method_from_server(Method, Content, Ref, State = #state{closing = Closing
"server because channel is closing~n",
[self(), {Method, Content}]),
{noreply, State};
true -> handle_method_from_server1(Method,
amqp_msg(Content, Ref), State)
true ->
Msg = amqp_msg(Content, Ref, self()),
handle_method_from_server1(Method,
Msg, State)
end
end.
@ -811,16 +814,29 @@ flush_writer(#state{driver = network, writer = Writer}) ->
end;
flush_writer(#state{driver = direct}) ->
ok.
amqp_msg(none) ->
none;
amqp_msg(Content) ->
{Props, Payload} = rabbit_basic:from_content(Content),
#amqp_msg{props = Props, payload = Payload}.
amqp_msg(Content, none).
amqp_msg(none, _) ->
none;
amqp_msg(Content, none) ->
amqp_msg(Content);
amqp_msg(Content, Ref) ->
{Props, Payload} = rabbit_basic:from_content(Content),
#amqp_msg{props = Props, payload = Payload, ref = Ref}.
#amqp_msg{props = Props, payload = Payload};
amqp_msg(Content, ChPid) when is_pid(ChPid) ->
{Props, Payload} = rabbit_basic:from_content(Content),
#amqp_msg{props = Props, payload = Payload,
channel_pid = ChPid}.
amqp_msg(none, _, _) ->
none;
amqp_msg(Content, none, ChPid) when is_pid(ChPid) ->
amqp_msg(Content, ChPid);
amqp_msg(Content, Ref, ChPid) when is_pid(ChPid) ->
{Props, Payload} = rabbit_basic:from_content(Content),
#amqp_msg{props = Props, payload = Payload,
ref = Ref, channel_pid = ChPid}.
build_content(none) ->
none;