Refrain from resetting message expiry in message requeue

This commit is contained in:
Emile Joubert 2011-10-20 16:22:35 +01:00
parent 8b267b22d9
commit 3a200963eb
7 changed files with 36 additions and 50 deletions

View File

@ -22,9 +22,6 @@
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
-type(confirm_required() :: boolean()).
-type(message_properties_transformer() ::
fun ((rabbit_types:message_properties())
-> rabbit_types:message_properties())).
-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
@ -51,7 +48,7 @@
-spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}).
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
-spec(requeue/3 :: ([ack()], message_properties_transformer(), state())
-spec(requeue/2 :: ([ack()], state())
-> {[rabbit_guid:guid()], state()}).
-spec(len/1 :: (state()) -> non_neg_integer()).
-spec(is_empty/1 :: (state()) -> boolean()).

View File

@ -554,9 +554,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
run_backing_queue(
BQ, fun (M, BQS) ->
{_MsgIds, BQS1} =
M:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS),
BQS1
{_MsgIds, BQS1} = M:requeue(AckTags, BQS), BQS1
end, State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
@ -670,11 +668,6 @@ discard_delivery(#delivery{sender = ChPid,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:discard(Message, ChPid, BQS)}.
reset_msg_expiry_fun(TTL) ->
fun(MsgProps) ->
MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)}
end.
message_properties(#q{ttl=TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL)}.

View File

@ -107,7 +107,7 @@ behaviour_info(callbacks) ->
%% Reinsert messages into the queue which have already been
%% delivered and were pending acknowledgement.
{requeue, 3},
{requeue, 2},
%% How long is my queue?
{len, 1},

View File

@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2,
requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3]).
@ -248,11 +248,11 @@ ack(AckTags, State = #state { gm = GM,
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
requeue(AckTags, MsgPropsFun, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
{MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
ok = gm:broadcast(GM, {requeue, MsgPropsFun, MsgIds}),
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
{MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
ok = gm:broadcast(GM, {requeue, MsgIds}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->

View File

@ -827,14 +827,14 @@ process_instruction({ack, MsgIds},
[] = MsgIds1 -- MsgIds, %% ASSERTION
{ok, State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 }};
process_instruction({requeue, MsgPropsFun, MsgIds},
process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{ok, case length(AckTags) =:= length(MsgIds) of
true ->
{MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
{MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 };
false ->

View File

@ -2238,11 +2238,10 @@ test_variable_queue_requeue(VQ0) ->
(_, Acc) ->
Acc
end, [], lists:zip(Acks, Seq)),
{_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset,
fun(X) -> X end, VQ3),
{_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset, VQ3),
VQ5 = lists:foldl(fun (AckTag, VQN) ->
{_MsgId, VQM} = rabbit_variable_queue:requeue(
[AckTag], fun(X) -> X end, VQN),
[AckTag], VQN),
VQM
end, VQ4, Subset),
VQ6 = lists:foldl(fun (AckTag, VQa) ->
@ -2426,7 +2425,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ2 = variable_queue_publish(false, 4, VQ1),
{VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2),
{_Guids, VQ4} =
rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
rabbit_variable_queue:requeue(AckTags, VQ3),
VQ5 = rabbit_variable_queue:timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),

View File

@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
dropwhile/2, fetch/2, ack/2, requeue/3, len/1, is_empty/1,
dropwhile/2, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3,
@ -628,21 +628,19 @@ ack(AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta,
q3 = Q3,
q4 = Q4,
in_counter = InCounter,
len = Len } = State) ->
requeue(AckTags, #vqstate { delta = Delta,
q3 = Q3,
q4 = Q4,
in_counter = InCounter,
len = Len } = State) ->
{SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
beta_limit(Q3),
fun publish_alpha/2,
MsgPropsFun, State),
fun publish_alpha/2, State),
{SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds,
delta_limit(Delta),
fun publish_beta/2,
MsgPropsFun, State1),
fun publish_beta/2, State1),
{Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
MsgPropsFun, State2),
State2),
MsgCount = length(MsgIds2),
{MsgIds2, a(reduce_memory_use(
State3 #vqstate { delta = Delta1,
@ -1335,50 +1333,49 @@ publish_beta(MsgStatus, State) ->
ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}.
%% Rebuild queue, inserting sequence ids to maintain ordering
queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, MsgPropsFun, State) ->
queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds,
Limit, PubFun, MsgPropsFun, State).
Limit, PubFun, State).
queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
Limit, PubFun, MsgPropsFun, State)
Limit, PubFun, State)
when Limit == undefined orelse SeqId < Limit ->
case ?QUEUE:out(Q) of
{{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
when SeqIdQ < SeqId ->
%% enqueue from the remaining queue
queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds,
Limit, PubFun, MsgPropsFun, State);
Limit, PubFun, State);
{_, _Q1} ->
%% enqueue from the remaining list of sequence ids
{MsgStatus, State1} = msg_from_pending_ack(SeqId, MsgPropsFun,
State),
{MsgStatus, State1} = msg_from_pending_ack(SeqId, State),
{#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
PubFun(MsgStatus, State1),
queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
Limit, PubFun, MsgPropsFun, State2)
Limit, PubFun, State2)
end;
queue_merge(SeqIds, Q, Front, MsgIds,
_Limit, _PubFun, _MsgPropsFun, State) ->
_Limit, _PubFun, State) ->
{SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}.
delta_merge([], Delta, MsgIds, _MsgPropsFun, State) ->
delta_merge([], Delta, MsgIds, State) ->
{Delta, MsgIds, State};
delta_merge(SeqIds, Delta, MsgIds, MsgPropsFun, State) ->
delta_merge(SeqIds, Delta, MsgIds, State) ->
lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) ->
{#msg_status { msg_id = MsgId } = MsgStatus, State1} =
msg_from_pending_ack(SeqId, MsgPropsFun, State0),
msg_from_pending_ack(SeqId, State0),
{_MsgStatus, State2} =
maybe_write_to_disk(true, true, MsgStatus, State1),
{expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2
msg_from_pending_ack(SeqId, MsgPropsFun, State) ->
msg_from_pending_ack(SeqId, State) ->
{#msg_status { msg_props = MsgProps } = MsgStatus, State1} =
remove_pending_ack(SeqId, State),
{MsgStatus #msg_status {
msg_props = (MsgPropsFun(MsgProps)) #message_properties {
needs_confirming = false } }, State1}.
msg_props = MsgProps #message_properties { needs_confirming = false } },
State1}.
beta_limit(Q) ->
case ?QUEUE:peek(Q) of