Handle basic.credit_drained frame

This frame is very specific to what the AMQP 1.0 plugin requires to work
and was emitted but not handled correctly. This implementation simply
handles the frame and forwards it on to the consuming process.

[#161256187]
This commit is contained in:
kjnilsson 2018-10-24 17:15:44 +01:00
parent 7c5dadc93c
commit 0818e022d7
2 changed files with 10 additions and 2 deletions

View File

@ -796,6 +796,10 @@ handle_method_from_server1(#'basic.nack'{} = BasicNack, none,
CH ! BasicNack,
{noreply, update_confirm_set(BasicNack, State)};
handle_method_from_server1(#'basic.credit_drained'{} = CreditDrained, none,
#state{consumer = Consumer} = State) ->
Consumer ! CreditDrained,
{noreply, State};
handle_method_from_server1(Method, none, State) ->
{noreply, rpc_bottom_half(Method, State)};
handle_method_from_server1(Method, Content, State) ->

View File

@ -185,7 +185,10 @@ handle_info({'DOWN', _MRef, process, Pid, _Info},
_ -> {ok, State} %% unnamed consumer went down
%% before receiving consume_ok
end
end.
end;
handle_info(#'basic.credit_drained'{} = Method, State) ->
deliver_to_consumer_or_die(Method, Method, State),
{ok, State}.
%% @private
handle_call({register_default_consumer, Pid}, _From,
@ -252,7 +255,8 @@ tag(#'basic.consume'{consumer_tag = Tag}) -> Tag;
tag(#'basic.consume_ok'{consumer_tag = Tag}) -> Tag;
tag(#'basic.cancel'{consumer_tag = Tag}) -> Tag;
tag(#'basic.cancel_ok'{consumer_tag = Tag}) -> Tag;
tag(#'basic.deliver'{consumer_tag = Tag}) -> Tag.
tag(#'basic.deliver'{consumer_tag = Tag}) -> Tag;
tag(#'basic.credit_drained'{consumer_tag = Tag}) -> Tag.
add_to_monitor_dict(Pid, Monitors) ->
case maps:find(Pid, Monitors) of