Pluggable queues land

This commit is contained in:
Matthew Sackman 2010-04-08 16:05:08 +01:00
parent 23e259db14
commit 332d920fa5
7 changed files with 202 additions and 108 deletions

View File

@ -19,6 +19,7 @@
{ssl_options, []},
{vm_memory_high_watermark, 0.4},
{msg_store_index_module, rabbit_msg_store_ets_index},
{queue_internal_queue_module, rabbit_variable_queue},
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{default_vhost, <<"/">>},

View File

@ -0,0 +1,55 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developers of the Original Code are LShift Ltd,
%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
%%
%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
%% Copyright (C) 2007-2009 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
%% (C) 2007-2009 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
%% Contributor(s): ______________________________________.
%%
-spec(init/2 :: (queue_name(), pid() | atom()) -> state()).
-spec(terminate/1 :: (state()) -> state()).
-spec(publish/2 :: (basic_message(), state()) -> state()).
-spec(publish_delivered/2 :: (basic_message(), state()) -> {ack(), state()}).
-spec(set_queue_ram_duration_target/2 ::
(('undefined' | 'infinity' | number()), state()) -> state()).
-spec(remeasure_rates/1 :: (state()) -> state()).
-spec(ram_duration/1 :: (state()) -> number()).
-spec(fetch/1 :: (state()) ->
{('empty'|{basic_message(), boolean(), ack(), non_neg_integer()}),
state()}).
-spec(ack/2 :: ([ack()], state()) -> state()).
-spec(len/1 :: (state()) -> non_neg_integer()).
-spec(is_empty/1 :: (state()) -> boolean()).
-spec(purge/1 :: (state()) -> {non_neg_integer(), state()}).
-spec(delete_and_terminate/1 :: (state()) -> state()).
-spec(requeue/2 :: ([{basic_message(), ack()}], state()) -> state()).
-spec(tx_publish/2 :: (basic_message(), state()) -> state()).
-spec(tx_rollback/2 :: ([msg_id()], state()) -> state()).
-spec(tx_commit/4 :: ([msg_id()], [ack()], {pid(), any()}, state()) ->
{boolean(), state()}).
-spec(needs_sync/1 :: (state()) -> ('undefined' | {atom(), [any()]})).
-spec(handle_pre_hibernate/1 :: (state()) -> state()).
-spec(status/1 :: (state()) -> [{atom(), any()}]).

View File

@ -1,44 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developers of the Original Code are LShift Ltd,
%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
%%
%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
%% Contributor(s): ______________________________________.
%%
-record(delta,
{ start_seq_id,
count,
end_seq_id %% note the end_seq_id is always >, not >=
}).
-ifdef(use_specs).
-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
count :: non_neg_integer (),
end_seq_id :: non_neg_integer() }).
-endif.

View File

@ -114,11 +114,14 @@ init(Q) ->
rabbit_amqqueue, set_maximum_since_use, [self()]),
ok = rabbit_memory_monitor:register
(self(), {rabbit_amqqueue, set_queue_duration, [self()]}),
{ok, InternalQueueModule} =
application:get_env(queue_internal_queue_module),
{ok, #q{q = Q,
owner = none,
exclusive_consumer = none,
has_had_consumers = false,
internal_queue = rabbit_variable_queue,
internal_queue = InternalQueueModule,
internal_queue_state = undefined,
internal_queue_timeout_fun = undefined,
next_msg_id = 1,
@ -387,7 +390,7 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{internal_queue = IQ}) ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
{_SeqId, IQS} = IQ:publish(Message, State #q.internal_queue_state),
IQS = IQ:publish(Message, State #q.internal_queue_state),
{false, NewState #q { internal_queue_state = IQS }}
end.

View File

@ -0,0 +1,97 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developers of the Original Code are LShift Ltd,
%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
%%
%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
%% Copyright (C) 2007-2009 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
%% (C) 2007-2009 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
%% Contributor(s): ______________________________________.
%%
-module(rabbit_internal_queue_type).
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
[
%% Called with queue name and the persistent msg_store to
%% use. Transient store is in ?TRANSIENT_MSG_STORE
{init, 2},
%% Called on queue shutdown when queue isn't being deleted
{terminate, 1},
%% Called when the queue is terminating and needs to delete all
%% its content.
{delete_and_terminate, 1},
%% Remove all messages in the queue, but not messages which have
%% been fetched and are pending acks.
{purge, 1},
%% Publish a message
{publish, 2},
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the internal queue).
{publish_delivered, 2},
{fetch, 1},
{ack, 2},
{tx_publish, 2},
{tx_rollback, 2},
{tx_commit, 4},
%% Reinsert messages into the queue which have already been
%% delivered and were (likely) pending acks.q
{requeue, 2},
{len, 1},
{is_empty, 1},
{set_queue_ram_duration_target, 2},
{remeasure_rates, 1},
{ram_duration, 1},
%% Can return 'undefined' or a function atom name plus list of
%% arguments to be invoked in the internal queue module as soon
%% as the queue process can manage (either on an empty mailbox,
%% or when a timer fires).
{needs_sync, 1},
%% Called immediately before the queue hibernates
{handle_pre_hibernate, 1},
%% Exists for debugging purposes, to be able to expose state via
%% rabbitmqctl list_queues internal_queue_status
{status, 1}
];
behaviour_info(_Other) ->
undefined.

View File

@ -41,7 +41,6 @@
-import(lists).
-include("rabbit.hrl").
-include("rabbit_queue.hrl").
-include_lib("kernel/include/file.hrl").
test_content_prop_roundtrip(Datum, Binary) ->
@ -1348,14 +1347,13 @@ test_queue_index() ->
variable_queue_publish(IsPersistent, Count, VQ) ->
lists:foldl(
fun (_N, {Acc, VQ1}) ->
{SeqId, VQ2} = rabbit_variable_queue:publish(
rabbit_basic:message(
rabbit_misc:r(<<>>, exchange, <<>>),
<<>>, [], <<>>, rabbit_guid:guid(),
IsPersistent), VQ1),
{[SeqId | Acc], VQ2}
end, {[], VQ}, lists:seq(1, Count)).
fun (_N, VQN) ->
rabbit_variable_queue:publish(
rabbit_basic:message(
rabbit_misc:r(<<>>, exchange, <<>>),
<<>>, [], <<>>, rabbit_guid:guid(),
IsPersistent), VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
@ -1377,9 +1375,7 @@ fresh_variable_queue() ->
assert_prop(S0, len, 0),
assert_prop(S0, q1, 0),
assert_prop(S0, q2, 0),
assert_prop(S0, delta, #delta { start_seq_id = undefined,
count = 0,
end_seq_id = undefined }),
assert_prop(S0, delta, {delta, undefined, 0, undefined}),
assert_prop(S0, q3, 0),
assert_prop(S0, q4, 0),
VQ.
@ -1394,7 +1390,7 @@ test_variable_queue_dynamic_duration_change() ->
VQ0 = fresh_variable_queue(),
%% start by sending in a couple of segments worth
Len1 = 2*SegmentSize,
{_SeqIds, VQ1} = variable_queue_publish(false, Len1, VQ0),
VQ1 = variable_queue_publish(false, Len1, VQ0),
VQ2 = rabbit_variable_queue:remeasure_rates(VQ1),
{ok, _TRef} = timer:send_after(1000, {duration, 60,
fun (V) -> (V*0.75)-1 end}),
@ -1406,7 +1402,7 @@ test_variable_queue_dynamic_duration_change() ->
%% just publish and fetch some persistent msgs, this hits the the
%% partial segment path in queue_index due to the period when
%% duration was 0 and the entire queue was delta.
{_SeqIds1, VQ7} = variable_queue_publish(true, 20, VQ6),
VQ7 = variable_queue_publish(true, 20, VQ6),
{VQ8, AckTags1} = variable_queue_fetch(20, true, false, 20, VQ7),
VQ9 = rabbit_variable_queue:ack(AckTags1, VQ8),
VQ10 = rabbit_variable_queue:handle_pre_hibernate(VQ9),
@ -1417,7 +1413,7 @@ test_variable_queue_dynamic_duration_change() ->
passed.
test_variable_queue_dynamic_duration_change_f(Len, VQ0) ->
{_SeqIds, VQ1} = variable_queue_publish(false, 1, VQ0),
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(VQ1),
VQ3 = rabbit_variable_queue:ack([AckTag], VQ2),
receive
@ -1444,27 +1440,24 @@ test_variable_queue_partial_segments_delta_thing() ->
SegmentSize = rabbit_queue_index:segment_size(),
HalfSegment = SegmentSize div 2,
VQ0 = fresh_variable_queue(),
{_SeqIds, VQ1} =
variable_queue_publish(true, SegmentSize + HalfSegment, VQ0),
VQ1 = variable_queue_publish(true, SegmentSize + HalfSegment, VQ0),
VQ2 = rabbit_variable_queue:remeasure_rates(VQ1),
VQ3 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ2),
%% one segment in q3 as betas, and half a segment in delta
S3 = rabbit_variable_queue:status(VQ3),
io:format("~p~n", [S3]),
assert_prop(S3, delta, #delta { start_seq_id = SegmentSize,
count = HalfSegment,
end_seq_id = SegmentSize + HalfSegment }),
assert_prop(S3, delta, {delta, SegmentSize, HalfSegment,
SegmentSize + HalfSegment}),
assert_prop(S3, q3, SegmentSize),
assert_prop(S3, len, SegmentSize + HalfSegment),
VQ4 = rabbit_variable_queue:set_queue_ram_duration_target(infinity, VQ3),
{[_SeqId], VQ5} = variable_queue_publish(true, 1, VQ4),
VQ5 = variable_queue_publish(true, 1, VQ4),
%% should have 1 alpha, but it's in the same segment as the deltas
S5 = rabbit_variable_queue:status(VQ5),
io:format("~p~n", [S5]),
assert_prop(S5, q1, 1),
assert_prop(S5, delta, #delta { start_seq_id = SegmentSize,
count = HalfSegment,
end_seq_id = SegmentSize + HalfSegment }),
assert_prop(S5, delta, {delta, SegmentSize, HalfSegment,
SegmentSize + HalfSegment}),
assert_prop(S5, q3, SegmentSize),
assert_prop(S5, len, SegmentSize + HalfSegment + 1),
{VQ6, AckTags} = variable_queue_fetch(SegmentSize, true, false,
@ -1472,9 +1465,7 @@ test_variable_queue_partial_segments_delta_thing() ->
%% the half segment should now be in q3 as betas
S6 = rabbit_variable_queue:status(VQ6),
io:format("~p~n", [S6]),
assert_prop(S6, delta, #delta { start_seq_id = undefined,
count = 0,
end_seq_id = undefined }),
assert_prop(S6, delta, {delta, undefined, 0, undefined}),
assert_prop(S6, q1, 1),
assert_prop(S6, q3, HalfSegment),
assert_prop(S6, len, HalfSegment + 1),

View File

@ -133,6 +133,8 @@
%%----------------------------------------------------------------------------
-behaviour(rabbit_internal_queue_type).
-record(vqstate,
{ q1,
q2,
@ -162,7 +164,6 @@
}).
-include("rabbit.hrl").
-include("rabbit_queue.hrl").
-record(msg_status,
{ msg,
@ -174,6 +175,12 @@
index_on_disk
}).
-record(delta,
{ start_seq_id,
count,
end_seq_id %% note the end_seq_id is always >, not >=
}).
%% When we discover, on publish, that we should write some indices to
%% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of
%% betas that we must be due to write indices for before we do any
@ -187,12 +194,17 @@
-ifdef(use_specs).
-type(msg_id() :: binary()).
-type(bpqueue() :: any()).
-type(msg_id() :: binary()).
-type(seq_id() :: non_neg_integer()).
-type(ack() :: {'ack_index_and_store', msg_id(), seq_id(), atom() | pid()}
| 'ack_not_on_disk').
-type(vqstate() :: #vqstate {
-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
count :: non_neg_integer (),
end_seq_id :: non_neg_integer() }).
-type(state() :: #vqstate {
q1 :: queue(),
q2 :: bpqueue(),
delta :: delta(),
@ -220,36 +232,12 @@
transient_threshold :: non_neg_integer()
}).
-spec(init/2 :: (queue_name(), pid() | atom()) -> vqstate()).
-spec(terminate/1 :: (vqstate()) -> vqstate()).
-spec(publish/2 :: (basic_message(), vqstate()) ->
{seq_id(), vqstate()}).
-spec(publish_delivered/2 :: (basic_message(), vqstate()) ->
{ack(), vqstate()}).
-spec(set_queue_ram_duration_target/2 ::
(('undefined' | 'infinity' | number()), vqstate()) -> vqstate()).
-spec(remeasure_rates/1 :: (vqstate()) -> vqstate()).
-spec(ram_duration/1 :: (vqstate()) -> number()).
-spec(fetch/1 :: (vqstate()) ->
{('empty'|{basic_message(), boolean(), ack(), non_neg_integer()}),
vqstate()}).
-spec(ack/2 :: ([ack()], vqstate()) -> vqstate()).
-spec(len/1 :: (vqstate()) -> non_neg_integer()).
-spec(is_empty/1 :: (vqstate()) -> boolean()).
-spec(purge/1 :: (vqstate()) -> {non_neg_integer(), vqstate()}).
-spec(delete_and_terminate/1 :: (vqstate()) -> vqstate()).
-spec(requeue/2 :: ([{basic_message(), ack()}], vqstate()) -> vqstate()).
-spec(tx_publish/2 :: (basic_message(), vqstate()) -> vqstate()).
-spec(tx_rollback/2 :: ([msg_id()], vqstate()) -> vqstate()).
-spec(tx_commit/4 :: ([msg_id()], [ack()], {pid(), any()}, vqstate()) ->
{boolean(), vqstate()}).
-spec(tx_commit_post_msg_store/5 ::
(boolean(), [msg_id()], [ack()], {pid(), any()}, vqstate()) ->
{boolean(), vqstate()}).
-spec(tx_commit_index/1 :: (vqstate()) -> {boolean(), vqstate()}).
-spec(needs_sync/1 :: (vqstate()) -> ('undefined' | {atom(), [any()]})).
-spec(handle_pre_hibernate/1 :: (vqstate()) -> vqstate()).
-spec(status/1 :: (vqstate()) -> [{atom(), any()}]).
(boolean(), [msg_id()], [ack()], {pid(), any()}, state()) ->
{boolean(), state()}).
-spec(tx_commit_index/1 :: (state()) -> {boolean(), state()}).
-include("rabbit_internal_queue_type_spec.hrl").
-endif.
@ -321,7 +309,8 @@ terminate(State = #vqstate {
publish(Msg, State) ->
State1 = limit_ram_index(State),
publish(Msg, false, false, State1).
{_SeqId, State2} = publish(Msg, false, false, State1),
State2.
publish_delivered(Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
@ -553,7 +542,8 @@ requeue(MsgsWithAckTags, State) ->
rabbit_misc:dict_cons(MsgStore, MsgId, Dict),
true}
end,
{_SeqId, StateN1} = publish(Msg, true, MsgOnDisk, StateN),
{_SeqId, StateN1} =
publish(Msg, true, MsgOnDisk, StateN),
{SeqIdsAcc1, Dict1, StateN1}
end, {[], dict:new(), State}, MsgsWithAckTags),
IndexState1 =
@ -648,7 +638,8 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms},
lists:foldl(
fun (Msg = #basic_message { is_persistent = IsPersistent },
{SeqIdsAcc, StateN}) ->
{SeqId, StateN1} = publish(Msg, false, IsPersistent, StateN),
{SeqId, StateN1} =
publish(Msg, false, IsPersistent, StateN),
{case IsPersistentStore andalso IsPersistent of
true -> [SeqId | SeqIdsAcc];
false -> SeqIdsAcc