refactoring: rename "round robin" to "active consumers"

This commit is contained in:
Matthias Radestock 2009-06-10 23:50:40 +01:00
parent 096e6d4324
commit 5c21de9043
1 changed files with 31 additions and 27 deletions

View File

@ -53,7 +53,7 @@
has_had_consumers,
next_msg_id,
message_buffer,
round_robin,
active_consumers,
blocked_consumers}).
-record(consumer, {tag, ack_required}).
@ -100,7 +100,7 @@ init(Q) ->
has_had_consumers = false,
next_msg_id = 1,
message_buffer = queue:new(),
round_robin = queue:new(),
active_consumers = queue:new(),
blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}.
terminate(_Reason, State) ->
@ -167,14 +167,14 @@ record_current_channel_tx(ChPid, Txn) ->
deliver_immediately(Message, Delivered,
State = #q{q = #amqqueue{name = QName},
round_robin = RoundRobin,
active_consumers = ActiveConsumers,
blocked_consumers = BlockedConsumers,
next_msg_id = NextId}) ->
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
case queue:out(RoundRobin) of
case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
RoundRobinTail} ->
ActiveConsumersTail} ->
C = #cr{limiter_pid = LimiterPid,
unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
@ -190,29 +190,31 @@ deliver_immediately(Message, Delivered,
NewC = C#cr{unsent_message_count = Count + 1,
unacked_messages = NewUAM},
store_ch_record(NewC),
{NewRoundRobin, NewBlockedConsumers} =
{NewActiveConsumers, NewBlockedConsumers} =
case ch_record_state_transition(C, NewC) of
ok -> {queue:in(QEntry, RoundRobinTail),
ok -> {queue:in(QEntry, ActiveConsumersTail),
BlockedConsumers};
block ->
{RoundRobin1, BlockedConsumers1} =
{ActiveConsumers1, BlockedConsumers1} =
move_consumers(ChPid,
RoundRobinTail,
ActiveConsumersTail,
BlockedConsumers),
{RoundRobin1,
{ActiveConsumers1,
queue:in(QEntry, BlockedConsumers1)}
end,
{offered, AckRequired,
State#q{round_robin = NewRoundRobin,
State#q{active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers,
next_msg_id = NextId + 1}};
false ->
store_ch_record(C#cr{is_limit_active = true}),
{NewRoundRobin, NewBlockedConsumers} =
move_consumers(ChPid, RoundRobin, BlockedConsumers),
{NewActiveConsumers, NewBlockedConsumers} =
move_consumers(ChPid,
ActiveConsumers,
BlockedConsumers),
deliver_immediately(
Message, Delivered,
State#q{round_robin = NewRoundRobin,
State#q{active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers})
end;
{empty, _} ->
@ -277,12 +279,12 @@ possibly_unblock(State, ChPid, Update) ->
store_ch_record(NewC),
case ch_record_state_transition(C, NewC) of
ok -> State;
unblock -> {NewBlockedeConsumers, NewRoundRobin} =
unblock -> {NewBlockedeConsumers, NewActiveConsumers} =
move_consumers(ChPid,
State#q.blocked_consumers,
State#q.round_robin),
State#q.active_consumers),
run_poke_burst(
State#q{round_robin = NewRoundRobin,
State#q{active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedeConsumers})
end
end.
@ -312,7 +314,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
{ChPid, _} -> none;
Other -> Other
end,
round_robin = remove_consumers(ChPid, State#q.round_robin),
active_consumers = remove_consumers(
ChPid, State#q.active_consumers),
blocked_consumers = remove_consumers(
ChPid, State#q.blocked_consumers)}),
case should_auto_delete(NewState) of
@ -360,7 +363,7 @@ run_poke_burst(MessageBuffer, State) ->
State#q{message_buffer = MessageBuffer}
end.
is_unused(State) -> queue:is_empty(State#q.round_robin) andalso
is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso
queue:is_empty(State#q.blocked_consumers).
maybe_send_reply(_ChPid, undefined) -> ok;
@ -521,7 +524,7 @@ i(acks_uncommitted, _) ->
lists:sum([length(Pending) ||
#tx{pending_acks = Pending} <- all_tx_record()]);
i(consumers, State) ->
queue:len(State#q.round_robin) + queue:len(State#q.blocked_consumers);
queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers);
i(transactions, _) ->
length(all_tx_record());
i(memory, _) ->
@ -639,10 +642,10 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
State1#q.blocked_consumers)};
false -> run_poke_burst(
State1#q{
round_robin =
active_consumers =
add_consumer(
ChPid, Consumer,
State1#q.round_robin)})
State1#q.active_consumers)})
end,
reply(ok, State2)
end
@ -666,9 +669,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
State#q{exclusive_consumer = cancel_holder(ChPid,
ConsumerTag,
Holder),
round_robin = remove_consumer(
ChPid, ConsumerTag,
State#q.round_robin),
active_consumers = remove_consumer(
ChPid, ConsumerTag,
State#q.active_consumers),
blocked_consumers = remove_consumer(
ChPid, ConsumerTag,
State#q.blocked_consumers)},
@ -680,8 +683,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
message_buffer = MessageBuffer,
round_robin = RoundRobin}) ->
reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State);
active_consumers = ActiveConsumers}) ->
reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)},
State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{message_buffer = MessageBuffer}) ->