Use direct client flow control instead of prefetch_count (bug 26619)
The hard coded prefetch count we had was a bit low, but we were only using it for flow control anyway, so let's use the more lightweight flow control mechanism we have now.
This commit is contained in:
parent
08637bbf14
commit
b5be81ed58
|
@ -57,11 +57,10 @@ init(Args) ->
|
|||
amqp_channel:call(
|
||||
Ch, #'queue.bind'{exchange = ?X, queue = Q,
|
||||
routing_key = pget(pattern, Args)}),
|
||||
#'basic.qos_ok'{} =
|
||||
amqp_channel:call(Ch, #'basic.qos'{prefetch_count = 10}),
|
||||
amqp_channel:enable_delivery_flow_control(Ch),
|
||||
#'basic.consume_ok'{} =
|
||||
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q,
|
||||
no_ack = false}, self()),
|
||||
no_ack = true}, self()),
|
||||
{ok, Dir} = application:get_env(directory),
|
||||
Filename = Dir ++ "/" ++ binary_to_list(Name) ++ ".log",
|
||||
case filelib:ensure_dir(Filename) of
|
||||
|
@ -94,11 +93,11 @@ handle_call(_Req, _From, State) ->
|
|||
handle_cast(_C, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(Delivery = {#'basic.deliver'{delivery_tag = Seq}, #amqp_msg{}},
|
||||
handle_info({BasicDeliver, Msg, DeliveryCtx},
|
||||
State = #state{ch = Ch, file = F, format = Format}) ->
|
||||
amqp_channel:notify_received(DeliveryCtx),
|
||||
Print = fun(Fmt, Args) -> io:format(F, Fmt, Args) end,
|
||||
log(Format, Print, delivery_to_log_record(Delivery)),
|
||||
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = Seq}),
|
||||
log(Format, Print, delivery_to_log_record({BasicDeliver, Msg})),
|
||||
{noreply, State};
|
||||
|
||||
handle_info(_I, State) ->
|
||||
|
|
Loading…
Reference in New Issue