First stab at basic.qos

This commit is contained in:
Ben Hood 2008-11-18 19:40:02 +00:00
parent 3081245448
commit b9e461fc6d
3 changed files with 71 additions and 35 deletions

View File

@ -30,7 +30,7 @@
-export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1,
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2]).
-export([on_node_down/1]).
@ -82,8 +82,8 @@
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
-spec(basic_consume/7 ::
(amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) ->
-spec(basic_consume/8 ::
(amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) ->
'ok' | {'error', 'queue_owned_by_another_connection' |
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
@ -238,10 +238,10 @@ claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
gen_server:call(QPid, {basic_get, ChPid, NoAck}).
basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid,
basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
ConsumerTag, ExclusiveConsume, OkMsg}).
gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).

View File

@ -43,6 +43,7 @@
% Queue's state
-record(q, {q,
owner,
limiter_mapping,
exclusive_consumer,
has_had_consumers,
next_msg_id,
@ -75,6 +76,7 @@ init(Q) ->
exclusive_consumer = none,
has_had_consumers = false,
next_msg_id = 1,
limiter_mapping = dict:new(),
message_buffer = queue:new(),
round_robin = queue:new()}, ?HIBERNATE_AFTER}.
@ -141,34 +143,61 @@ update_store_and_maybe_block_ch(
deliver_immediately(Message, Delivered,
State = #q{q = #amqqueue{name = QName},
round_robin = RoundRobin,
limiter_mapping = LimiterMapping,
next_msg_id = NextId}) ->
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
case queue:out(RoundRobin) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
{{value, QEntry = {ChPid,
#consumer{tag = ConsumerTag,
ack_required = AckRequired = true}}},
RoundRobinTail} ->
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
{QName, self(), NextId, Delivered, Message}),
C = #cr{unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
NewUAM = case AckRequired of
true -> dict:store(NextId, Message, UAM);
false -> UAM
end,
NewConsumers =
case update_store_and_maybe_block_ch(
C#cr{unsent_message_count = Count + 1,
unacked_messages = NewUAM}) of
ok -> queue:in(QEntry, RoundRobinTail);
block_ch -> block_consumers(ChPid, RoundRobinTail)
end,
{offered, AckRequired, State#q{round_robin = NewConsumers,
next_msg_id = NextId +1}};
% Use Qos Limits if an ack is required
% Query the limiter to find out if a limit has been breached
LimiterPid = dict:fetch(ChPid, LimiterMapping),
case rabbit_limiter:can_send(LimiterPid, self()) of
true ->
really_deliver(AckRequired, ChPid, ConsumerTag,
Delivered, Message, NextId, QName,
QEntry, RoundRobinTail, State);
false ->
% Have another go by cycling through the consumer
% queue
NewConsumers = block_consumers(ChPid, RoundRobinTail),
deliver_immediately(Message, Delivered,
State#q{round_robin = NewConsumers})
end;
{{value, QEntry = {ChPid,
#consumer{tag = ConsumerTag,
ack_required = AckRequired = false}}},
RoundRobinTail} ->
really_deliver(AckRequired, ChPid, ConsumerTag,
Delivered, Message, NextId, QName,
QEntry, RoundRobinTail, State);
{empty, _} ->
not_offered
end.
% TODO The arity of this function seems a bit large :-(
really_deliver(AckRequired, ChPid, ConsumerTag, Delivered, Message, NextId,
QName, QEntry, RoundRobinTail, State) ->
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
{QName, self(), NextId, Delivered, Message}),
C = #cr{unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
NewUAM = case AckRequired of
true -> dict:store(NextId, Message, UAM);
false -> UAM
end,
NewConsumers =
case update_store_and_maybe_block_ch(
C#cr{unsent_message_count = Count + 1,
unacked_messages = NewUAM}) of
ok -> queue:in(QEntry, RoundRobinTail);
block_ch -> block_consumers(ChPid, RoundRobinTail)
end,
{offered, AckRequired, State#q{round_robin = NewConsumers,
next_msg_id = NextId +1}}.
attempt_delivery(none, Message, State) ->
case deliver_immediately(Message, false, State) of
{offered, false, State1} ->
@ -519,11 +548,14 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply(empty, State)
end;
handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
ExclusiveConsume, OkMsg},
_From, State = #q{owner = Owner,
exclusive_consumer = ExistingHolder,
round_robin = RoundRobin}) ->
handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg},
_From, _State = #q{owner = Owner,
limiter_mapping = Mapping,
exclusive_consumer = ExistingHolder,
round_robin = RoundRobin}) ->
% TODO Remove the underscore in front of the first State variable
State = _State#q{limiter_mapping = dict:store(ChPid, LimiterPid, Mapping)},
case check_queue_owner(Owner, ReaderPid) of
mismatch ->
reply({error, queue_owned_by_another_connection}, State);

View File

@ -36,7 +36,7 @@
-record(ch, {state, proxy_pid, reader_pid, writer_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host,
username, virtual_host, limiter,
most_recently_declared_queue, consumer_mapping}).
%%----------------------------------------------------------------------------
@ -102,6 +102,8 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
username = Username,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
% TODO See point 3.1.1 of the design - start the limiter lazily
limiter = rabbit_limiter:start_link(),
consumer_mapping = dict:new()}.
handle_message({method, Method, Content}, State) ->
@ -323,6 +325,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
nowait = NoWait},
_, State = #ch{ proxy_pid = ProxyPid,
reader_pid = ReaderPid,
limiter = LimiterPid,
consumer_mapping = ConsumerMapping }) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@ -340,7 +343,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
QueueName,
fun (Q) ->
rabbit_amqqueue:basic_consume(
Q, NoAck, ReaderPid, ProxyPid,
Q, NoAck, ReaderPid, ProxyPid, LimiterPid,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@ -405,8 +408,9 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
end
end;
handle_method(#'basic.qos'{}, _, State) ->
%% FIXME: Need to implement QOS
handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
_, State = #ch{limiter = Limiter}) ->
Limiter ! {prefetch_count, PrefetchCount},
{reply, #'basic.qos_ok'{}, State};
handle_method(#'basic.recover'{requeue = true},