Switch to using prefetch rather than credit, that way we don't have to freeze the basic.credit methods. This has some minor implications:

* The basic.get hack is no longer possible, so we need to take it out. But everyone seemed to hate it anyway...
 * We no longer limit the rate of message ingress in no-ack mode. But that's what we do for federated exchanges.
 * The code gets a bit simpler. Woo.
This commit is contained in:
Simon MacMullen 2013-07-01 15:00:40 +01:00
parent 8852e78327
commit 8b29406c32
1 changed files with 39 additions and 71 deletions

View File

@ -21,7 +21,7 @@
-behaviour(gen_server2). -behaviour(gen_server2).
-export([start_link/1, go/0, run/1, pause/1, basic_get/1]). -export([start_link/1, go/0, run/1, pause/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
@ -29,17 +29,14 @@
-import(rabbit_federation_util, [name/1]). -import(rabbit_federation_util, [name/1]).
-record(not_started, {queue, run, upstream, upstream_params}). -record(not_started, {queue, run, upstream, upstream_params}).
-record(state, {queue, run, conn, ch, dconn, dch, credit, ctag, -record(state, {queue, run, conn, ch, dconn, dch, upstream, upstream_params,
upstream, upstream_params, unacked}). unacked}).
-define(CREDIT_MORE_AT_RATIO, 0.25).
start_link(Args) -> start_link(Args) ->
gen_server2:start_link(?MODULE, Args, [{timeout, infinity}]). gen_server2:start_link(?MODULE, Args, [{timeout, infinity}]).
run(QName) -> cast(QName, run). run(QName) -> cast(QName, run).
pause(QName) -> cast(QName, pause). pause(QName) -> cast(QName, pause).
basic_get(QName) -> cast(QName, basic_get).
go() -> cast(go). go() -> cast(go).
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
@ -97,12 +94,12 @@ handle_cast(go, State = #not_started{}) ->
handle_cast(go, State) -> handle_cast(go, State) ->
{noreply, State}; {noreply, State};
handle_cast(run, State = #state{upstream = #upstream{prefetch_count = Prefetch}, handle_cast(run, State = #state{upstream = Upstream,
ch = Ch, run = false, ctag = CTag}) -> upstream_params = UParams,
amqp_channel:cast(Ch, #'basic.credit'{consumer_tag = CTag, ch = Ch,
credit = Prefetch, run = false}) ->
drain = false}), consume(Ch, Upstream, UParams#upstream_params.x_or_q),
{noreply, State#state{run = true, credit = Prefetch}}; {noreply, State#state{run = true}};
handle_cast(run, State = #not_started{}) -> handle_cast(run, State = #not_started{}) ->
{noreply, State#not_started{run = true}}; {noreply, State#not_started{run = true}};
@ -118,22 +115,9 @@ handle_cast(pause, State = #state{run = false}) ->
handle_cast(pause, State = #not_started{}) -> handle_cast(pause, State = #not_started{}) ->
{noreply, State#not_started{run = false}}; {noreply, State#not_started{run = false}};
handle_cast(pause, State = #state{ch = Ch, ctag = CTag}) -> handle_cast(pause, State = #state{ch = Ch}) ->
amqp_channel:cast(Ch, #'basic.credit'{consumer_tag = CTag, cancel(Ch),
credit = 0, {noreply, State#state{run = false}};
drain = false}),
{noreply, State#state{run = false, credit = 0}};
handle_cast(basic_get, State = #not_started{}) ->
{noreply, State};
handle_cast(basic_get, State = #state{ch = Ch, credit = Credit, ctag = CTag}) ->
Credit1 = Credit + 1,
amqp_channel:cast(
Ch, #'basic.credit'{consumer_tag = CTag,
credit = Credit1,
drain = false}),
{noreply, State#state{credit = Credit1}};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}. {stop, {unexpected_cast, Msg}, State}.
@ -155,13 +139,9 @@ handle_info({#'basic.deliver'{redelivered = Redelivered} = DeliverMethod, Msg},
State = #state{queue = #amqqueue{name = QName}, State = #state{queue = #amqqueue{name = QName},
upstream = Upstream, upstream = Upstream,
upstream_params = UParams, upstream_params = UParams,
run = Run,
credit = Credit,
ctag = CTag,
ch = Ch, ch = Ch,
dch = DCh, dch = DCh,
unacked = Unacked}) -> unacked = Unacked}) ->
#upstream{prefetch_count = Prefetch} = Upstream,
PublishMethod = #'basic.publish'{exchange = <<"">>, PublishMethod = #'basic.publish'{exchange = <<"">>,
routing_key = QName#resource.name}, routing_key = QName#resource.name},
HeadersFun = fun (H) -> update_headers(UParams, Redelivered, H) end, HeadersFun = fun (H) -> update_headers(UParams, Redelivered, H) end,
@ -169,25 +149,8 @@ handle_info({#'basic.deliver'{redelivered = Redelivered} = DeliverMethod, Msg},
Unacked1 = rabbit_federation_link_util:forward( Unacked1 = rabbit_federation_link_util:forward(
Upstream, DeliverMethod, Ch, DCh, PublishMethod, Upstream, DeliverMethod, Ch, DCh, PublishMethod,
HeadersFun, ForwardFun, Msg, Unacked), HeadersFun, ForwardFun, Msg, Unacked),
%% TODO we could also hook this up to internal credit
%% TODO actually we could reject when 'stopped' %% TODO actually we could reject when 'stopped'
CreditMoreAt = trunc(Prefetch * ?CREDIT_MORE_AT_RATIO), {noreply, State#state{unacked = Unacked1}};
Credit1 = case Run of
false -> Credit - 1;
true -> case Credit of
I when I < CreditMoreAt ->
More = Prefetch - CreditMoreAt,
amqp_channel:cast(
Ch, #'basic.credit'{consumer_tag = CTag,
credit = More,
drain = false}),
I - 1 + More;
I ->
I - 1
end
end,
{noreply, State#state{credit = Credit1,
unacked = Unacked1}};
handle_info(#'basic.cancel'{}, handle_info(#'basic.cancel'{},
State = #state{queue = #amqqueue{name = QName}, State = #state{queue = #amqqueue{name = QName},
@ -230,41 +193,33 @@ code_change(_OldVsn, State, _Extra) ->
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
go(S0 = #not_started{run = Run, go(S0 = #not_started{run = Run,
upstream = Upstream, upstream = Upstream = #upstream{
prefetch_count = Prefetch},
upstream_params = UParams, upstream_params = UParams,
queue = Queue = #amqqueue{name = QName}}) -> queue = Queue = #amqqueue{name = QName}}) ->
#upstream_params{x_or_q = UQueue = #amqqueue{ #upstream_params{x_or_q = UQueue = #amqqueue{
durable = Durable, durable = Durable,
auto_delete = AutoDelete, auto_delete = AutoDelete,
arguments = QArgs}} = UParams, arguments = Args}} = UParams,
Credit = case Run of
true -> Upstream#upstream.prefetch_count;
false -> 0
end,
CArgs = [{<<"x-priority">>, long, -1},
{<<"x-credit">>, table, [{<<"credit">>, long, Credit},
{<<"drain">>, bool, false}]}],
Unacked = rabbit_federation_link_util:unacked_new(), Unacked = rabbit_federation_link_util:unacked_new(),
NoAck = Upstream#upstream.ack_mode =:= 'no-ack',
rabbit_federation_link_util:start_conn_ch( rabbit_federation_link_util:start_conn_ch(
fun (Conn, Ch, DConn, DCh) -> fun (Conn, Ch, DConn, DCh) ->
amqp_channel:call(Ch, #'queue.declare'{queue = name(UQueue), amqp_channel:call(Ch, #'queue.declare'{queue = name(UQueue),
durable = Durable, durable = Durable,
auto_delete = AutoDelete, auto_delete = AutoDelete,
arguments = QArgs}), arguments = Args}),
#'basic.consume_ok'{consumer_tag = CTag} = amqp_channel:call(Ch, #'basic.qos'{prefetch_count = Prefetch}),
amqp_channel:call(
Ch, #'basic.consume'{queue = name(UQueue), case Run of
no_ack = NoAck, true -> consume(Ch, Upstream, UQueue);
arguments = CArgs}), false -> ok
end,
{noreply, #state{queue = Queue, {noreply, #state{queue = Queue,
run = Run, run = Run,
conn = Conn, conn = Conn,
ch = Ch, ch = Ch,
dconn = DConn, dconn = DConn,
dch = DCh, dch = DCh,
credit = Credit,
ctag = CTag,
upstream = Upstream, upstream = Upstream,
upstream_params = UParams, upstream_params = UParams,
unacked = Unacked}} unacked = Unacked}}
@ -306,3 +261,16 @@ visit_match({table, T}, Info) ->
end, [<<"uri">>, <<"virtual_host">>, <<"queue">>]); end, [<<"uri">>, <<"virtual_host">>, <<"queue">>]);
visit_match(_ ,_) -> visit_match(_ ,_) ->
false. false.
consume(Ch, Upstream, UQueue) ->
NoAck = Upstream#upstream.ack_mode =:= 'no-ack',
amqp_channel:cast(
Ch, #'basic.consume'{queue = name(UQueue),
no_ack = NoAck,
nowait = true,
consumer_tag = <<"consumer">>,
arguments = [{<<"x-priority">>, long, -1}]}).
cancel(Ch) ->
amqp_channel:cast(Ch, #'basic.cancel'{nowait = true,
consumer_tag = <<"consumer">>}).