1993 lines
78 KiB
Erlang
1993 lines
78 KiB
Erlang
%% This Source Code Form is subject to the terms of the Mozilla Public
|
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
%%
|
|
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
|
%%
|
|
|
|
-module(backing_queue_SUITE).
|
|
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
|
-include("amqqueue.hrl").
|
|
|
|
-compile(nowarn_export_all).
|
|
-compile(export_all).
|
|
|
|
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
|
|
-define(TRANSIENT_MSG_STORE, msg_store_transient).
|
|
|
|
-define(TIMEOUT, 30000).
|
|
-define(VHOST, <<"/">>).
|
|
|
|
-define(VARIABLE_QUEUE_TESTCASES, [
|
|
variable_queue_partial_segments_delta_thing,
|
|
variable_queue_all_the_bits_not_covered_elsewhere_A,
|
|
variable_queue_all_the_bits_not_covered_elsewhere_B,
|
|
variable_queue_drop,
|
|
variable_queue_fold_msg_on_disk,
|
|
variable_queue_dropfetchwhile,
|
|
variable_queue_dropwhile_restart,
|
|
variable_queue_dropwhile_sync_restart,
|
|
variable_queue_restart_large_seq_id,
|
|
variable_queue_ack_limiting,
|
|
variable_queue_purge,
|
|
variable_queue_requeue,
|
|
variable_queue_requeue_ram_beta,
|
|
variable_queue_fold
|
|
]).
|
|
|
|
-define(BACKING_QUEUE_TESTCASES, [
|
|
bq_queue_index,
|
|
bq_queue_index_props,
|
|
{variable_queue_default, [parallel], ?VARIABLE_QUEUE_TESTCASES},
|
|
bq_variable_queue_delete_msg_store_files_callback,
|
|
bq_queue_recover
|
|
]).
|
|
|
|
all() ->
|
|
[
|
|
{group, backing_queue_tests}
|
|
].
|
|
|
|
groups() ->
|
|
Common = [
|
|
{backing_queue_embed_limit_0, [], ?BACKING_QUEUE_TESTCASES},
|
|
{backing_queue_embed_limit_1024, [], ?BACKING_QUEUE_TESTCASES}
|
|
],
|
|
V2Only = [
|
|
v2_delete_segment_file_completely_acked,
|
|
v2_delete_segment_file_partially_acked,
|
|
v2_delete_segment_file_partially_acked_with_holes
|
|
],
|
|
[
|
|
{backing_queue_tests, [], [
|
|
msg_store,
|
|
msg_store_read_many_fanout,
|
|
msg_store_file_scan,
|
|
{backing_queue_v2, [], Common ++ V2Only}
|
|
]}
|
|
].
|
|
|
|
group(backing_queue_tests) ->
|
|
[
|
|
%% Several tests based on lazy queues may take more than 30 minutes.
|
|
{timetrap, {hours, 1}}
|
|
];
|
|
group(_) ->
|
|
[].
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Testsuite setup/teardown.
|
|
%% -------------------------------------------------------------------
|
|
|
|
init_per_suite(Config) ->
|
|
rabbit_ct_helpers:log_environment(),
|
|
rabbit_ct_helpers:run_setup_steps(Config).
|
|
|
|
end_per_suite(Config) ->
|
|
rabbit_ct_helpers:run_teardown_steps(Config).
|
|
|
|
init_per_group(Group, Config) ->
|
|
case lists:member({group, Group}, all()) of
|
|
true ->
|
|
ClusterSize = 1,
|
|
Config1 = rabbit_ct_helpers:set_config(Config, [
|
|
{rmq_nodename_suffix, Group},
|
|
{rmq_nodes_count, ClusterSize}
|
|
]),
|
|
rabbit_ct_helpers:run_steps(Config1,
|
|
rabbit_ct_broker_helpers:setup_steps() ++
|
|
rabbit_ct_client_helpers:setup_steps() ++ [
|
|
fun(C) -> init_per_group1(Group, C) end
|
|
]);
|
|
false ->
|
|
rabbit_ct_helpers:run_steps(Config, [
|
|
fun(C) -> init_per_group1(Group, C) end
|
|
])
|
|
end.
|
|
|
|
init_per_group1(backing_queue_tests, Config) ->
|
|
%% @todo Is that test still relevant?
|
|
Module = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
application, get_env, [rabbit, backing_queue_module]),
|
|
case Module of
|
|
{ok, rabbit_priority_queue} ->
|
|
rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, setup_backing_queue_test_group, [Config]);
|
|
_ ->
|
|
{skip, rabbit_misc:format(
|
|
"Backing queue module not supported by this test group: ~tp~n",
|
|
[Module])}
|
|
end;
|
|
init_per_group1(backing_queue_embed_limit_0, Config) ->
|
|
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
application, set_env, [rabbit, queue_index_embed_msgs_below, 0]),
|
|
Config;
|
|
init_per_group1(backing_queue_embed_limit_1024, Config) ->
|
|
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
application, set_env, [rabbit, queue_index_embed_msgs_below, 1024]),
|
|
Config;
|
|
init_per_group1(variable_queue_default, Config) ->
|
|
rabbit_ct_helpers:set_config(Config, {variable_queue_type, default});
|
|
%% @todo These groups are no longer used?
|
|
init_per_group1(from_cluster_node1, Config) ->
|
|
rabbit_ct_helpers:set_config(Config, {test_direction, {0, 1}});
|
|
init_per_group1(from_cluster_node2, Config) ->
|
|
rabbit_ct_helpers:set_config(Config, {test_direction, {1, 0}});
|
|
init_per_group1(_, Config) ->
|
|
Config.
|
|
|
|
end_per_group(Group, Config) ->
|
|
case lists:member({group, Group}, all()) of
|
|
true ->
|
|
rabbit_ct_helpers:run_steps(Config,
|
|
[fun(C) -> end_per_group1(Group, C) end] ++
|
|
rabbit_ct_client_helpers:teardown_steps() ++
|
|
rabbit_ct_broker_helpers:teardown_steps());
|
|
false ->
|
|
Config
|
|
end.
|
|
|
|
end_per_group1(backing_queue_tests, Config) ->
|
|
rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, teardown_backing_queue_test_group, [Config]);
|
|
end_per_group1(Group, Config)
|
|
when Group =:= backing_queue_embed_limit_0
|
|
orelse Group =:= backing_queue_embed_limit_1024 ->
|
|
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
application, set_env, [rabbit, queue_index_embed_msgs_below,
|
|
?config(rmq_queue_index_embed_msgs_below, Config)]),
|
|
Config;
|
|
end_per_group1(_, Config) ->
|
|
Config.
|
|
|
|
init_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue;
|
|
Testcase == variable_queue_fold ->
|
|
rabbit_ct_helpers:testcase_started(Config, Testcase);
|
|
init_per_testcase(Testcase, Config) ->
|
|
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
|
|
|
end_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue;
|
|
Testcase == variable_queue_fold ->
|
|
rabbit_ct_helpers:testcase_finished(Config, Testcase);
|
|
end_per_testcase(Testcase, Config) ->
|
|
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Message store.
|
|
%% -------------------------------------------------------------------
|
|
|
|
msg_store(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, msg_store1, [Config]).
|
|
|
|
msg_store1(_Config) ->
|
|
%% We simulate the SeqId (used as a message ref for the flying optimisation)
|
|
%% using the process dictionary.
|
|
GenRefFun = fun(Key) -> V = case get(Key) of undefined -> 0; V0 -> V0 end, put(Key, V + 1), V end,
|
|
GenRef = fun() -> GenRefFun(msc) end,
|
|
restart_msg_store_empty(),
|
|
MsgIds = [{GenRef(), msg_id_bin(M)} || M <- lists:seq(1,100)],
|
|
{MsgIds1stHalf, MsgIds2ndHalf} = lists:split(length(MsgIds) div 2, MsgIds),
|
|
Ref = rabbit_guid:gen(),
|
|
{Cap, MSCState} = msg_store_client_init_capture(
|
|
?PERSISTENT_MSG_STORE, Ref),
|
|
Ref2 = rabbit_guid:gen(),
|
|
{Cap2, MSC2State} = msg_store_client_init_capture(
|
|
?PERSISTENT_MSG_STORE, Ref2),
|
|
%% check we don't contain any of the msgs we're about to publish
|
|
false = msg_store_contains(false, MsgIds, MSCState),
|
|
%% test confirm logic
|
|
passed = test_msg_store_confirms([hd(MsgIds)], Cap, GenRef, MSCState),
|
|
%% check we don't contain any of the msgs we're about to publish
|
|
false = msg_store_contains(false, MsgIds, MSCState),
|
|
%% publish the first half
|
|
ok = msg_store_write(MsgIds1stHalf, MSCState),
|
|
%% sync on the first half
|
|
ok = on_disk_await(Cap, MsgIds1stHalf),
|
|
%% publish the second half
|
|
ok = msg_store_write(MsgIds2ndHalf, MSCState),
|
|
%% check they're all in there
|
|
true = msg_store_contains(true, MsgIds, MSCState),
|
|
%% publish the latter half twice so we hit the caching and ref
|
|
%% count code. We need to do this through a 2nd client since a
|
|
%% single client is not supposed to write the same message more
|
|
%% than once without first removing it.
|
|
ok = msg_store_write([{GenRefFun(msc2), MsgId} || {_, MsgId} <- MsgIds2ndHalf], MSC2State),
|
|
%% check they're still all in there
|
|
true = msg_store_contains(true, MsgIds, MSCState),
|
|
%% sync on the 2nd half
|
|
ok = on_disk_await(Cap2, MsgIds2ndHalf),
|
|
%% cleanup
|
|
ok = on_disk_stop(Cap2),
|
|
ok = rabbit_msg_store:client_delete_and_terminate(MSC2State),
|
|
ok = on_disk_stop(Cap),
|
|
%% read them all
|
|
MSCState1 = msg_store_read(MsgIds, MSCState),
|
|
%% read them all again - this will hit the cache, not disk
|
|
MSCState2 = msg_store_read(MsgIds, MSCState1),
|
|
%% remove them all
|
|
{ok, _} = msg_store_remove(MsgIds, MSCState2),
|
|
%% check first half doesn't exist
|
|
false = msg_store_contains(false, MsgIds1stHalf, MSCState2),
|
|
%% check second half does exist
|
|
true = msg_store_contains(true, MsgIds2ndHalf, MSCState2),
|
|
%% read the second half again
|
|
MSCState3 = msg_store_read(MsgIds2ndHalf, MSCState2),
|
|
%% read the second half again, just for fun (aka code coverage)
|
|
MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3),
|
|
ok = rabbit_msg_store:client_terminate(MSCState4),
|
|
%% stop and restart, preserving every other msg in 2nd half
|
|
ok = rabbit_variable_queue:stop_msg_store(?VHOST),
|
|
ok = rabbit_variable_queue:start_msg_store(?VHOST,
|
|
[], {fun ([]) -> finished;
|
|
([{_, MsgId}|MsgIdsTail])
|
|
when length(MsgIdsTail) rem 2 == 0 ->
|
|
{MsgId, 1, MsgIdsTail};
|
|
([{_, MsgId}|MsgIdsTail]) ->
|
|
{MsgId, 0, MsgIdsTail}
|
|
end, MsgIds2ndHalf}),
|
|
MSCState5 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
|
|
%% check we have the right msgs left
|
|
lists:foldl(
|
|
fun ({_, MsgId}, Bool) ->
|
|
not(Bool = rabbit_msg_store:contains(MsgId, MSCState5))
|
|
end, false, MsgIds2ndHalf),
|
|
ok = rabbit_msg_store:client_terminate(MSCState5),
|
|
%% restart empty
|
|
restart_msg_store_empty(),
|
|
MSCState6 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
|
|
%% check we don't contain any of the msgs
|
|
false = msg_store_contains(false, MsgIds, MSCState6),
|
|
%% publish the first half again
|
|
ok = msg_store_write(MsgIds1stHalf, MSCState6),
|
|
%% this should force some sort of sync internally otherwise misread
|
|
ok = rabbit_msg_store:client_terminate(
|
|
msg_store_read(MsgIds1stHalf, MSCState6)),
|
|
MSCState7 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
|
|
{ok, _} = msg_store_remove(MsgIds1stHalf, MSCState7),
|
|
ok = rabbit_msg_store:client_terminate(MSCState7),
|
|
%% restart empty
|
|
restart_msg_store_empty(), %% now safe to reuse msg_ids
|
|
%% push a lot of msgs in... at least 100 files worth
|
|
{ok, FileSize} = application:get_env(rabbit, msg_store_file_size_limit),
|
|
PayloadSizeBits = 65536,
|
|
BigCount = trunc(100 * FileSize / (PayloadSizeBits div 8)),
|
|
MsgIdsBig = [{GenRef(), msg_id_bin(X)} || X <- lists:seq(1, BigCount)],
|
|
Payload = << 0:PayloadSizeBits >>,
|
|
ok = with_msg_store_client(
|
|
?PERSISTENT_MSG_STORE, Ref,
|
|
fun (MSCStateM) ->
|
|
[ok = rabbit_msg_store:write(SeqId, MsgId, Payload, MSCStateM) ||
|
|
{SeqId, MsgId} <- MsgIdsBig],
|
|
MSCStateM
|
|
end),
|
|
%% now read them to ensure we hit the fast client-side reading
|
|
ok = foreach_with_msg_store_client(
|
|
?PERSISTENT_MSG_STORE, Ref,
|
|
fun ({_, MsgId}, MSCStateM) ->
|
|
{{ok, Payload}, MSCStateN} = rabbit_msg_store:read(
|
|
MsgId, MSCStateM),
|
|
MSCStateN
|
|
end, MsgIdsBig),
|
|
%% We remove every other other message first, then do it again a second
|
|
%% time with another set of messages and then a third time. We start
|
|
%% with younger messages on purpose. So we split the list in three
|
|
%% lists keeping the message reference.
|
|
Part = fun
|
|
PartFun([], _, Acc) ->
|
|
Acc;
|
|
PartFun([E|Tail], N, Acc) ->
|
|
Pos = 1 + (N rem 3),
|
|
AccL = element(Pos, Acc),
|
|
PartFun(Tail, N + 1, setelement(Pos, Acc, [E|AccL]))
|
|
end,
|
|
{One, Two, Three} = Part(MsgIdsBig, 0, {[], [], []}),
|
|
ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, One),
|
|
%% This is likely to hit GC (under 50% good data left in files, but no empty files).
|
|
ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, Two),
|
|
%% Files are empty now and will get removed.
|
|
ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, Three),
|
|
%% ensure empty
|
|
ok = with_msg_store_client(
|
|
?PERSISTENT_MSG_STORE, Ref,
|
|
fun (MSCStateM) ->
|
|
false = msg_store_contains(false, MsgIdsBig, MSCStateM),
|
|
MSCStateM
|
|
end),
|
|
%%
|
|
passed = test_msg_store_client_delete_and_terminate(fun() -> GenRefFun(msc_cdat) end),
|
|
%% restart empty
|
|
restart_msg_store_empty(),
|
|
passed.
|
|
|
|
msg_store_read_many_fanout(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, msg_store_read_many_fanout1, [Config]).
|
|
|
|
msg_store_read_many_fanout1(_Config) ->
|
|
GenRefFun = fun(Key) -> V = case get(Key) of undefined -> 0; V0 -> V0 end, put(Key, V + 1), V end,
|
|
GenRef = fun() -> GenRefFun(msc) end,
|
|
%% We will fill the first message store file with random messages
|
|
%% + 1 fanout message (written once for now). We will then write
|
|
%% two messages from our queue, then the fanout message (to +1
|
|
%% from our queue), and two more messages. We expect all messages
|
|
%% from our queue to be in the current write file, except the
|
|
%% fanout message. We then try to read the messages.
|
|
restart_msg_store_empty(),
|
|
CRef1 = rabbit_guid:gen(),
|
|
CRef2 = rabbit_guid:gen(),
|
|
{ok, FileSize} = application:get_env(rabbit, msg_store_file_size_limit),
|
|
PayloadSizeBits = 65536,
|
|
Payload = <<0:PayloadSizeBits>>,
|
|
%% @todo -7 because -1 and -hd, fix better.
|
|
NumRandomMsgs = (FileSize div (PayloadSizeBits div 8)) - 1,
|
|
RandomMsgIds = [{GenRef(), msg_id_bin(X)} || X <- lists:seq(1, NumRandomMsgs)],
|
|
FanoutMsgId = {GenRef(), msg_id_bin(NumRandomMsgs + 1)},
|
|
[Q1, Q2, Q3, Q4] = [{GenRef(), msg_id_bin(X)} || X <- lists:seq(NumRandomMsgs + 2, NumRandomMsgs + 5)],
|
|
QueueMsgIds0 = [Q1, Q2] ++ [FanoutMsgId] ++ [Q3, Q4],
|
|
QueueMsgIds = [{GenRef(), M} || {_, M} <- QueueMsgIds0],
|
|
BasicMsgFun = fun(MsgId) ->
|
|
Ex = rabbit_misc:r(<<>>, exchange, <<>>),
|
|
BasicMsg = rabbit_basic:message(Ex, <<>>,
|
|
#'P_basic'{delivery_mode = 2},
|
|
Payload),
|
|
{ok, Msg0} = mc_amqpl:message(Ex, <<>>, BasicMsg#basic_message.content),
|
|
mc:set_annotation(id, MsgId, Msg0)
|
|
end,
|
|
ok = with_msg_store_client(
|
|
?PERSISTENT_MSG_STORE, CRef1,
|
|
fun (MSCStateM) ->
|
|
[begin
|
|
Msg = BasicMsgFun(MsgId),
|
|
ok = rabbit_msg_store:write(SeqId, MsgId, Msg, MSCStateM)
|
|
end || {SeqId, MsgId} <- [FanoutMsgId] ++ RandomMsgIds],
|
|
MSCStateM
|
|
end),
|
|
ok = with_msg_store_client(
|
|
?PERSISTENT_MSG_STORE, CRef2,
|
|
fun (MSCStateM) ->
|
|
[begin
|
|
Msg = BasicMsgFun(MsgId),
|
|
ok = rabbit_msg_store:write(SeqId, MsgId, Msg, MSCStateM)
|
|
end || {SeqId, MsgId} <- QueueMsgIds],
|
|
MSCStateM
|
|
end),
|
|
ok = with_msg_store_client(
|
|
?PERSISTENT_MSG_STORE, CRef2,
|
|
fun (MSCStateM) ->
|
|
QueueOnlyMsgIds = [M || {_, M} <- QueueMsgIds],
|
|
{#{}, MSCStateN} = rabbit_msg_store:read_many(
|
|
QueueOnlyMsgIds, MSCStateM),
|
|
MSCStateN
|
|
end),
|
|
passed.
|
|
|
|
restart_msg_store_empty() ->
|
|
ok = rabbit_variable_queue:stop_msg_store(?VHOST),
|
|
ok = rabbit_variable_queue:start_msg_store(?VHOST,
|
|
undefined, {fun (ok) -> finished end, ok}).
|
|
|
|
msg_id_bin(X) ->
|
|
erlang:md5(term_to_binary(X)).
|
|
|
|
on_disk_capture() ->
|
|
receive
|
|
{await, MsgIds, Pid} -> on_disk_capture([], MsgIds, Pid);
|
|
stop -> done
|
|
end.
|
|
|
|
on_disk_capture([_|_], _Awaiting, Pid) ->
|
|
Pid ! {self(), surplus};
|
|
on_disk_capture(OnDisk, Awaiting, Pid) ->
|
|
receive
|
|
{on_disk, MsgIdsS} ->
|
|
MsgIds = sets:to_list(MsgIdsS),
|
|
on_disk_capture(OnDisk ++ (MsgIds -- Awaiting), Awaiting -- MsgIds,
|
|
Pid);
|
|
stop ->
|
|
done
|
|
after (case Awaiting of [] -> 200; _ -> ?TIMEOUT end) ->
|
|
case Awaiting of
|
|
[] -> Pid ! {self(), arrived}, on_disk_capture();
|
|
_ -> Pid ! {self(), timeout}
|
|
end
|
|
end.
|
|
|
|
on_disk_await(Pid, MsgIds0) when is_list(MsgIds0) ->
|
|
{_, MsgIds} = lists:unzip(MsgIds0),
|
|
Pid ! {await, MsgIds, self()},
|
|
receive
|
|
{Pid, arrived} -> ok;
|
|
{Pid, Error} -> Error
|
|
end.
|
|
|
|
on_disk_stop(Pid) ->
|
|
MRef = erlang:monitor(process, Pid),
|
|
Pid ! stop,
|
|
receive {'DOWN', MRef, process, Pid, _Reason} ->
|
|
ok
|
|
end.
|
|
|
|
msg_store_client_init_capture(MsgStore, Ref) ->
|
|
Pid = spawn(fun on_disk_capture/0),
|
|
{Pid, rabbit_vhost_msg_store:client_init(?VHOST, MsgStore, Ref,
|
|
fun (MsgIds, _ActionTaken) ->
|
|
Pid ! {on_disk, MsgIds}
|
|
end)}.
|
|
|
|
msg_store_contains(Atom, MsgIds, MSCState) ->
|
|
Atom = lists:foldl(
|
|
fun ({_, MsgId}, Atom1) when Atom1 =:= Atom ->
|
|
rabbit_msg_store:contains(MsgId, MSCState) end,
|
|
Atom, MsgIds).
|
|
|
|
msg_store_read(MsgIds, MSCState) ->
|
|
lists:foldl(fun ({_, MsgId}, MSCStateM) ->
|
|
{{ok, MsgId}, MSCStateN} = rabbit_msg_store:read(
|
|
MsgId, MSCStateM),
|
|
MSCStateN
|
|
end, MSCState, MsgIds).
|
|
|
|
msg_store_write(MsgIds, MSCState) ->
|
|
ok = lists:foldl(fun ({SeqId, MsgId}, ok) ->
|
|
rabbit_msg_store:write(SeqId, MsgId, MsgId, MSCState)
|
|
end, ok, MsgIds).
|
|
|
|
msg_store_write_flow(MsgIds, MSCState) ->
|
|
ok = lists:foldl(fun ({SeqId, MsgId}, ok) ->
|
|
rabbit_msg_store:write_flow(SeqId, MsgId, MsgId, MSCState)
|
|
end, ok, MsgIds).
|
|
|
|
msg_store_remove(MsgIds, MSCState) ->
|
|
rabbit_msg_store:remove(MsgIds, MSCState).
|
|
|
|
msg_store_remove(MsgStore, Ref, MsgIds) ->
|
|
with_msg_store_client(MsgStore, Ref,
|
|
fun (MSCStateM) ->
|
|
{ok, _} = msg_store_remove(MsgIds, MSCStateM),
|
|
MSCStateM
|
|
end).
|
|
|
|
with_msg_store_client(MsgStore, Ref, Fun) ->
|
|
rabbit_msg_store:client_terminate(
|
|
Fun(msg_store_client_init(MsgStore, Ref))).
|
|
|
|
foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
|
|
rabbit_msg_store:client_terminate(
|
|
lists:foldl(fun (MsgId, MSCState) -> Fun(MsgId, MSCState) end,
|
|
msg_store_client_init(MsgStore, Ref), L)).
|
|
|
|
test_msg_store_confirms(MsgIds, Cap, GenRef, MSCState) ->
|
|
%% write -> confirmed
|
|
MsgIds1 = [{GenRef(), MsgId} || {_, MsgId} <- MsgIds],
|
|
ok = msg_store_write(MsgIds1, MSCState),
|
|
ok = on_disk_await(Cap, MsgIds1),
|
|
%% remove -> _
|
|
{ok, []} = msg_store_remove(MsgIds1, MSCState),
|
|
ok = on_disk_await(Cap, []),
|
|
%% write, remove -> confirmed
|
|
MsgIds2 = [{GenRef(), MsgId} || {_, MsgId} <- MsgIds],
|
|
ok = msg_store_write(MsgIds2, MSCState),
|
|
{ok, ConfirmedMsgIds2} = msg_store_remove(MsgIds2, MSCState),
|
|
ok = on_disk_await(Cap, lists:filter(fun({_, MsgId}) -> not lists:member(MsgId, ConfirmedMsgIds2) end, MsgIds2)),
|
|
%% write, remove, write -> confirmed, confirmed
|
|
MsgIds3 = [{GenRef(), MsgId} || {_, MsgId} <- MsgIds],
|
|
ok = msg_store_write(MsgIds3, MSCState),
|
|
{ok, ConfirmedMsgIds3} = msg_store_remove(MsgIds3, MSCState),
|
|
MsgIds4 = [{GenRef(), MsgId} || {_, MsgId} <- MsgIds],
|
|
ok = msg_store_write(MsgIds4, MSCState),
|
|
ok = on_disk_await(Cap, lists:filter(fun({_, MsgId}) -> not lists:member(MsgId, ConfirmedMsgIds3) end, MsgIds3) ++ MsgIds4),
|
|
%% remove, write -> confirmed
|
|
{ok, []} = msg_store_remove(MsgIds4, MSCState),
|
|
MsgIds5 = [{GenRef(), MsgId} || {_, MsgId} <- MsgIds],
|
|
ok = msg_store_write(MsgIds5, MSCState),
|
|
ok = on_disk_await(Cap, MsgIds5),
|
|
%% remove, write, remove -> confirmed
|
|
{ok, []} = msg_store_remove(MsgIds5, MSCState),
|
|
MsgIds6 = [{GenRef(), MsgId} || {_, MsgId} <- MsgIds],
|
|
ok = msg_store_write(MsgIds6, MSCState),
|
|
{ok, ConfirmedMsgIds6} = msg_store_remove(MsgIds6, MSCState),
|
|
ok = on_disk_await(Cap, lists:filter(fun({_, MsgId}) -> not lists:member(MsgId, ConfirmedMsgIds6) end, MsgIds6)),
|
|
%% confirmation on timer-based sync
|
|
passed = test_msg_store_confirm_timer(GenRef),
|
|
passed.
|
|
|
|
test_msg_store_confirm_timer(GenRef) ->
|
|
Ref = rabbit_guid:gen(),
|
|
MsgId = msg_id_bin(1),
|
|
Self = self(),
|
|
MSCState = rabbit_vhost_msg_store:client_init(
|
|
?VHOST,
|
|
?PERSISTENT_MSG_STORE,
|
|
Ref,
|
|
fun (MsgIds, _ActionTaken) ->
|
|
case sets:is_element(MsgId, MsgIds) of
|
|
true -> Self ! on_disk;
|
|
false -> ok
|
|
end
|
|
end),
|
|
MsgIdsChecked = [{GenRef(), MsgId}],
|
|
ok = msg_store_write(MsgIdsChecked, MSCState),
|
|
ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], GenRef, MSCState, false),
|
|
{ok, _} = msg_store_remove(MsgIdsChecked, MSCState),
|
|
ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
|
|
passed.
|
|
|
|
msg_store_keep_busy_until_confirm(MsgIds, GenRef, MSCState, Blocked) ->
|
|
After = case Blocked of
|
|
false -> 0;
|
|
true -> ?MAX_WAIT
|
|
end,
|
|
Recurse = fun () -> msg_store_keep_busy_until_confirm(
|
|
MsgIds, GenRef, MSCState, credit_flow:blocked()) end,
|
|
receive
|
|
on_disk -> ok;
|
|
{bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg),
|
|
Recurse()
|
|
after After ->
|
|
MsgIds1 = [{GenRef(), MsgId} || MsgId <- MsgIds],
|
|
ok = msg_store_write_flow(MsgIds1, MSCState),
|
|
{ok, _} = msg_store_remove(MsgIds1, MSCState),
|
|
Recurse()
|
|
end.
|
|
|
|
test_msg_store_client_delete_and_terminate(GenRef) ->
|
|
restart_msg_store_empty(),
|
|
MsgIds = [{GenRef(), msg_id_bin(M)} || M <- lists:seq(1, 10)],
|
|
Ref = rabbit_guid:gen(),
|
|
MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
|
|
ok = msg_store_write(MsgIds, MSCState),
|
|
%% test the 'dying client' fast path for writes
|
|
ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
|
|
passed.
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Message store file scanning.
|
|
%% -------------------------------------------------------------------
|
|
|
|
%% While it is possible although very unlikely that this test case
|
|
%% produces false positives, all failures of this test case should
|
|
%% be investigated thoroughly as they test an algorithm that is
|
|
%% central to the reliability of the data in the shared message store.
|
|
%% Failing files can be found in the CT private data.
|
|
msg_store_file_scan(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, msg_store_file_scan1, [Config]).
|
|
|
|
msg_store_file_scan1(Config) ->
|
|
Scan = fun (Blocks) ->
|
|
Expected = gen_result(Blocks),
|
|
Path = gen_msg_file(Config, Blocks),
|
|
Result = rabbit_msg_store:scan_file_for_valid_messages(Path),
|
|
ok = file:delete(Path),
|
|
case Result of
|
|
Expected -> ok;
|
|
_ -> {expected, Expected, got, Result}
|
|
end
|
|
end,
|
|
%% Empty files.
|
|
ok = Scan([]),
|
|
ok = Scan([{pad, 1024}]),
|
|
ok = Scan([{pad, 1024 * 1024}]),
|
|
%% One-message files.
|
|
ok = Scan([{msg, gen_id(), <<0>>}]),
|
|
ok = Scan([{msg, gen_id(), <<255>>}]),
|
|
ok = Scan([{msg, gen_id(), gen_msg()}]),
|
|
ok = Scan([{pad, 1024}, {msg, gen_id(), gen_msg()}]),
|
|
ok = Scan([{pad, 1024 * 1024}, {msg, gen_id(), gen_msg()}]),
|
|
ok = Scan([{msg, gen_id(), gen_msg()}, {pad, 1024}]),
|
|
ok = Scan([{msg, gen_id(), gen_msg()}, {pad, 1024 * 1024}]),
|
|
%% Multiple messages.
|
|
ok = Scan([{msg, gen_id(), gen_msg()} || _ <- lists:seq(1, 2)]),
|
|
ok = Scan([{msg, gen_id(), gen_msg()} || _ <- lists:seq(1, 5)]),
|
|
ok = Scan([{msg, gen_id(), gen_msg()} || _ <- lists:seq(1, 20)]),
|
|
ok = Scan([{msg, gen_id(), gen_msg()} || _ <- lists:seq(1, 100)]),
|
|
%% Multiple messages with padding.
|
|
ok = Scan([
|
|
{pad, 1024},
|
|
{msg, gen_id(), gen_msg()},
|
|
{msg, gen_id(), gen_msg()}
|
|
]),
|
|
ok = Scan([
|
|
{msg, gen_id(), gen_msg()},
|
|
{pad, 1024},
|
|
{msg, gen_id(), gen_msg()}
|
|
]),
|
|
ok = Scan([
|
|
{msg, gen_id(), gen_msg()},
|
|
{msg, gen_id(), gen_msg()},
|
|
{pad, 1024}
|
|
]),
|
|
ok = Scan([
|
|
{pad, 1024},
|
|
{msg, gen_id(), gen_msg()},
|
|
{pad, 1024},
|
|
{msg, gen_id(), gen_msg()}
|
|
]),
|
|
ok = Scan([
|
|
{msg, gen_id(), gen_msg()},
|
|
{pad, 1024},
|
|
{msg, gen_id(), gen_msg()},
|
|
{pad, 1024}
|
|
]),
|
|
ok = Scan([
|
|
{pad, 1024},
|
|
{msg, gen_id(), gen_msg()},
|
|
{msg, gen_id(), gen_msg()},
|
|
{pad, 1024}
|
|
]),
|
|
ok = Scan([
|
|
{pad, 1024},
|
|
{msg, gen_id(), gen_msg()},
|
|
{pad, 1024},
|
|
{msg, gen_id(), gen_msg()},
|
|
{pad, 1024}
|
|
]),
|
|
OneOf = fun(A, B) ->
|
|
case rand:uniform() of
|
|
F when F < +0.5 -> A;
|
|
_ -> B
|
|
end
|
|
end,
|
|
ok = Scan([OneOf({msg, gen_id(), gen_msg()}, {pad, 1024}) || _ <- lists:seq(1, 2)]),
|
|
ok = Scan([OneOf({msg, gen_id(), gen_msg()}, {pad, 1024}) || _ <- lists:seq(1, 5)]),
|
|
ok = Scan([OneOf({msg, gen_id(), gen_msg()}, {pad, 1024}) || _ <- lists:seq(1, 20)]),
|
|
ok = Scan([OneOf({msg, gen_id(), gen_msg()}, {pad, 1024}) || _ <- lists:seq(1, 100)]),
|
|
%% Duplicate messages.
|
|
Msg = {msg, gen_id(), gen_msg()},
|
|
ok = Scan([Msg, Msg]),
|
|
ok = Scan([Msg, Msg, Msg, Msg, Msg]),
|
|
ok = Scan([Msg, {pad, 1024}, Msg]),
|
|
ok = Scan([Msg]
|
|
++ [OneOf({msg, gen_id(), gen_msg()}, {pad, 1024}) || _ <- lists:seq(1, 100)]
|
|
++ [Msg]),
|
|
%% Truncated start of message.
|
|
ok = Scan([{bin, <<21:56, "deadbeefdeadbeef", "hello", 255>>}]),
|
|
ok = Scan([{bin, <<21:48, "deadbeefdeadbeef", "hello", 255>>}]),
|
|
ok = Scan([{bin, <<21:40, "deadbeefdeadbeef", "hello", 255>>}]),
|
|
ok = Scan([{bin, <<21:32, "deadbeefdeadbeef", "hello", 255>>}]),
|
|
ok = Scan([{bin, <<21:24, "deadbeefdeadbeef", "hello", 255>>}]),
|
|
ok = Scan([{bin, <<21:16, "deadbeefdeadbeef", "hello", 255>>}]),
|
|
ok = Scan([{bin, <<21:8, "deadbeefdeadbeef", "hello", 255>>}]),
|
|
ok = Scan([{bin, <<"deadbeefdeadbeef", "hello", 255>>}]),
|
|
ok = Scan([{bin, <<"beefdeadbeef", "hello", 255>>}]),
|
|
ok = Scan([{bin, <<"deadbeef", "hello", 255>>}]),
|
|
ok = Scan([{bin, <<"beef", "hello", 255>>}]),
|
|
ok = Scan([{bin, <<"hello", 255>>}]),
|
|
ok = Scan([{bin, <<255>>}]),
|
|
%% Truncated end of message (unlikely).
|
|
ok = Scan([{bin, <<255>>}]),
|
|
ok = Scan([{bin, <<255, 255>>}]),
|
|
ok = Scan([{bin, <<255, 255, 255>>}]),
|
|
ok = Scan([{bin, <<255, 255, 255, 255>>}]),
|
|
ok = Scan([{bin, <<255, 255, 255, 255, 255>>}]),
|
|
ok = Scan([{bin, <<255, 255, 255, 255, 255, 255>>}]),
|
|
ok = Scan([{bin, <<255, 255, 255, 255, 255, 255, 255>>}]),
|
|
ok = Scan([{bin, <<255, 255, 255, 255, 255, 255, 255, 255>>}]),
|
|
ok = Scan([{bin, <<15:64, "deadbeefdeadbee">>}]),
|
|
ok = Scan([{bin, <<16:64, "deadbeefdeadbeef">>}]),
|
|
ok = Scan([{bin, <<17:64, "deadbeefdeadbeef", 0>>}]),
|
|
ok = Scan([{bin, <<17:64, "deadbeefdeadbeef", 255>>}]),
|
|
ok = Scan([{bin, <<17:64, "deadbeefdeadbeef", 255, 254>>}]),
|
|
%% Messages with no content.
|
|
ok = Scan([{bin, <<0:64, "deadbeefdeadbeef", 255>>}]),
|
|
ok = Scan([{msg, gen_id(), <<>>}]),
|
|
%% Tricky messages.
|
|
%%
|
|
%% These only get properly detected when the index is populated.
|
|
%% In this test case we simulate the index with a fun.
|
|
TrickyScan = fun (Blocks, Expected, Fun) ->
|
|
Path = gen_msg_file(Config, Blocks),
|
|
Result = rabbit_msg_store:scan_file_for_valid_messages(Path, Fun),
|
|
case Result of
|
|
Expected -> ok;
|
|
_ -> {expected, Expected, got, Result}
|
|
end
|
|
end,
|
|
ok = TrickyScan(
|
|
[{bin, <<0, 0:48, 17, 17, "idididididididid", 255, 0:4352/unit:8, 255>>}],
|
|
{ok, [{<<"idididididididid">>, 4378, 1}]},
|
|
fun(Obj = {<<"idididididididid">>, 4378, 1}) -> {valid, Obj}; (_) -> invalid end),
|
|
%% All good!!
|
|
passed.
|
|
|
|
gen_id() ->
|
|
rand:bytes(16).
|
|
|
|
gen_msg() ->
|
|
gen_msg(1024 * 1024).
|
|
|
|
gen_msg(MaxSize) ->
|
|
Bytes = rand:bytes(rand:uniform(MaxSize)),
|
|
%% We remove 255 to avoid false positives. In a running
|
|
%% rabbit node we will not get false positives because
|
|
%% we also check messages against the index.
|
|
<< <<case B of 255 -> 254; _ -> B end>> || <<B>> <= Bytes >>.
|
|
|
|
gen_msg_file(Config, Blocks) ->
|
|
PrivDir = ?config(priv_dir, Config),
|
|
TmpFile = integer_to_list(erlang:unique_integer([positive])),
|
|
Path = filename:join(PrivDir, TmpFile),
|
|
ok = file:write_file(Path, [case Block of
|
|
{bin, Bin} ->
|
|
Bin;
|
|
{pad, Size} ->
|
|
%% Empty space between messages is expected to be zeroes.
|
|
<<0:Size/unit:8>>;
|
|
{msg, MsgId, Msg} ->
|
|
Size = 16 + byte_size(Msg),
|
|
[<<Size:64>>, MsgId, Msg, <<255>>]
|
|
end || Block <- Blocks]),
|
|
Path.
|
|
|
|
gen_result(Blocks) ->
|
|
Messages = gen_result(Blocks, 0, []),
|
|
{ok, Messages}.
|
|
|
|
gen_result([], _, Acc) ->
|
|
Acc;
|
|
gen_result([{bin, Bin}|Tail], Offset, Acc) ->
|
|
gen_result(Tail, Offset + byte_size(Bin), Acc);
|
|
gen_result([{pad, Size}|Tail], Offset, Acc) ->
|
|
gen_result(Tail, Offset + Size, Acc);
|
|
gen_result([{msg, MsgId, Msg}|Tail], Offset, Acc) ->
|
|
Size = 9 + 16 + byte_size(Msg),
|
|
%% Only the first MsgId found is returned when duplicates exist.
|
|
case lists:keymember(MsgId, 1, Acc) of
|
|
false ->
|
|
gen_result(Tail, Offset + Size, [{MsgId, Size, Offset}|Acc]);
|
|
true ->
|
|
gen_result(Tail, Offset + Size, Acc)
|
|
end.
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Backing queue.
|
|
%% -------------------------------------------------------------------
|
|
|
|
setup_backing_queue_test_group(Config) ->
|
|
{ok, MaxJournal} =
|
|
application:get_env(rabbit, queue_index_max_journal_entries),
|
|
application:set_env(rabbit, queue_index_max_journal_entries, 128),
|
|
{ok, Bytes} =
|
|
application:get_env(rabbit, queue_index_embed_msgs_below),
|
|
rabbit_ct_helpers:set_config(Config, [
|
|
{rmq_queue_index_max_journal_entries, MaxJournal},
|
|
{rmq_queue_index_embed_msgs_below, Bytes}
|
|
]).
|
|
|
|
teardown_backing_queue_test_group(Config) ->
|
|
%% FIXME: Undo all the setup function did.
|
|
application:set_env(rabbit, queue_index_max_journal_entries,
|
|
?config(rmq_queue_index_max_journal_entries, Config)),
|
|
%% We will have restarted the message store, and thus changed
|
|
%% the order of the children of rabbit_sup. This will cause
|
|
%% problems if there are subsequent failures - see bug 24262.
|
|
ok = restart_app(),
|
|
Config.
|
|
|
|
bq_queue_index(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, bq_queue_index1, [Config]).
|
|
|
|
index_mod() ->
|
|
rabbit_classic_queue_index_v2.
|
|
|
|
bq_queue_index1(_Config) ->
|
|
init_queue_index(),
|
|
IndexMod = index_mod(),
|
|
SegmentSize = IndexMod:next_segment_boundary(0),
|
|
TwoSegs = SegmentSize + SegmentSize,
|
|
MostOfASegment = trunc(SegmentSize*0.75),
|
|
SeqIdsA = lists:seq(0, MostOfASegment-1),
|
|
NextSeqIdA = MostOfASegment,
|
|
SeqIdsB = lists:seq(MostOfASegment, 2*MostOfASegment),
|
|
NextSeqIdB = 2 * MostOfASegment + 1,
|
|
SeqIdsC = lists:seq(0, trunc(SegmentSize/2)),
|
|
SeqIdsD = lists:seq(0, SegmentSize*4),
|
|
|
|
VerifyReadWithPublishedFun = fun verify_read_with_published_v2/3,
|
|
|
|
with_empty_test_queue(
|
|
fun (Qi0, QName) ->
|
|
{0, 0, Qi1} = IndexMod:bounds(Qi0, undefined),
|
|
{Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1),
|
|
{0, SegmentSize, Qi3} = IndexMod:bounds(Qi2, NextSeqIdA),
|
|
{ReadA, Qi4} = IndexMod:read(0, SegmentSize, Qi3),
|
|
ok = VerifyReadWithPublishedFun(false, ReadA,
|
|
lists:reverse(SeqIdsMsgIdsA)),
|
|
%% should get length back as 0, as all the msgs were transient
|
|
{0, 0, Qi6} = restart_test_queue(Qi4, QName),
|
|
{NextSeqIdA, NextSeqIdA, Qi7} = IndexMod:bounds(Qi6, NextSeqIdA),
|
|
{Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7),
|
|
{0, TwoSegs, Qi9} = IndexMod:bounds(Qi8, NextSeqIdB),
|
|
{ReadB, Qi10} = IndexMod:read(0, SegmentSize, Qi9),
|
|
ok = VerifyReadWithPublishedFun(true, ReadB,
|
|
lists:reverse(SeqIdsMsgIdsB)),
|
|
%% should get length back as MostOfASegment
|
|
LenB = length(SeqIdsB),
|
|
BytesB = LenB * 10,
|
|
{LenB, BytesB, Qi12} = restart_test_queue(Qi10, QName),
|
|
{0, TwoSegs, Qi13} = IndexMod:bounds(Qi12, NextSeqIdB),
|
|
Qi15 = case IndexMod of
|
|
rabbit_queue_index ->
|
|
Qi14 = IndexMod:deliver(SeqIdsB, Qi13),
|
|
{ReadC, Qi14b} = IndexMod:read(0, SegmentSize, Qi14),
|
|
ok = VerifyReadWithPublishedFun(true, ReadC,
|
|
lists:reverse(SeqIdsMsgIdsB)),
|
|
Qi14b;
|
|
_ ->
|
|
Qi13
|
|
end,
|
|
{_DeletedSegments, Qi16} = IndexMod:ack(SeqIdsB, Qi15),
|
|
Qi17 = IndexMod:flush(Qi16),
|
|
%% Everything will have gone now because #pubs == #acks
|
|
{NextSeqIdB, NextSeqIdB, Qi18} = IndexMod:bounds(Qi17, NextSeqIdB),
|
|
%% should get length back as 0 because all persistent
|
|
%% msgs have been acked
|
|
{0, 0, Qi19} = restart_test_queue(Qi18, QName),
|
|
Qi19
|
|
end),
|
|
|
|
%% These next bits are just to hit the auto deletion of segment files.
|
|
%% First, partials:
|
|
%% a) partial pub+del+ack, then move to new segment
|
|
with_empty_test_queue(
|
|
fun (Qi0, _QName) ->
|
|
{Qi1, _SeqIdsMsgIdsC} = queue_index_publish(SeqIdsC,
|
|
false, Qi0),
|
|
Qi2 = case IndexMod of
|
|
rabbit_queue_index -> IndexMod:deliver(SeqIdsC, Qi1);
|
|
_ -> Qi1
|
|
end,
|
|
{_DeletedSegments, Qi3} = IndexMod:ack(SeqIdsC, Qi2),
|
|
Qi4 = IndexMod:flush(Qi3),
|
|
{Qi5, _SeqIdsMsgIdsC1} = queue_index_publish([SegmentSize],
|
|
false, Qi4),
|
|
Qi5
|
|
end),
|
|
|
|
%% b) partial pub+del, then move to new segment, then ack all in old segment
|
|
with_empty_test_queue(
|
|
fun (Qi0, _QName) ->
|
|
{Qi1, _SeqIdsMsgIdsC2} = queue_index_publish(SeqIdsC,
|
|
false, Qi0),
|
|
Qi2 = case IndexMod of
|
|
rabbit_queue_index -> IndexMod:deliver(SeqIdsC, Qi1);
|
|
_ -> Qi1
|
|
end,
|
|
{Qi3, _SeqIdsMsgIdsC3} = queue_index_publish([SegmentSize],
|
|
false, Qi2),
|
|
{_DeletedSegments, Qi4} = IndexMod:ack(SeqIdsC, Qi3),
|
|
IndexMod:flush(Qi4)
|
|
end),
|
|
|
|
%% c) just fill up several segments of all pubs, then +acks
|
|
with_empty_test_queue(
|
|
fun (Qi0, _QName) ->
|
|
{Qi1, _SeqIdsMsgIdsD} = queue_index_publish(SeqIdsD,
|
|
false, Qi0),
|
|
Qi2 = case IndexMod of
|
|
rabbit_queue_index -> IndexMod:deliver(SeqIdsD, Qi1);
|
|
_ -> Qi1
|
|
end,
|
|
{_DeletedSegments, Qi3} = IndexMod:ack(SeqIdsD, Qi2),
|
|
IndexMod:flush(Qi3)
|
|
end),
|
|
|
|
%% d) get messages in all states to a segment, then flush, then do
|
|
%% the same again, don't flush and read.
|
|
with_empty_test_queue(
|
|
fun (Qi0, _QName) ->
|
|
{Qi1, [Seven,Five,Four|_]} = queue_index_publish([0,1,2,4,5,7],
|
|
false, Qi0),
|
|
Qi2 = case IndexMod of
|
|
rabbit_queue_index -> IndexMod:deliver([0,1,4], Qi1);
|
|
_ -> Qi1
|
|
end,
|
|
{_DeletedSegments3, Qi3} = IndexMod:ack([0], Qi2),
|
|
Qi4 = IndexMod:flush(Qi3),
|
|
{Qi5, [Eight,Six|_]} = queue_index_publish([3,6,8], false, Qi4),
|
|
Qi6 = case IndexMod of
|
|
rabbit_queue_index -> IndexMod:deliver([2,3,5,6], Qi5);
|
|
_ -> Qi5
|
|
end,
|
|
{_DeletedSegments7, Qi7} = IndexMod:ack([1,2,3], Qi6),
|
|
{[], Qi8} = IndexMod:read(0, 4, Qi7),
|
|
{ReadD, Qi9} = IndexMod:read(4, 7, Qi8),
|
|
ok = VerifyReadWithPublishedFun(false, ReadD,
|
|
[Four, Five, Six]),
|
|
{ReadE, Qi10} = IndexMod:read(7, 9, Qi9),
|
|
ok = VerifyReadWithPublishedFun(false, ReadE,
|
|
[Seven, Eight]),
|
|
Qi10
|
|
end),
|
|
|
|
%% e) as for (d), but use terminate instead of read.
|
|
with_empty_test_queue(
|
|
fun (Qi0, QName) ->
|
|
{Qi1, _SeqIdsMsgIdsE} = queue_index_publish([0,1,2,4,5,7],
|
|
true, Qi0),
|
|
Qi2 = case IndexMod of
|
|
rabbit_queue_index -> IndexMod:deliver([0,1,4], Qi1);
|
|
_ -> Qi1
|
|
end,
|
|
{_DeletedSegments3, Qi3} = IndexMod:ack([0], Qi2),
|
|
{5, 50, Qi4} = restart_test_queue(Qi3, QName),
|
|
{Qi5, _SeqIdsMsgIdsF} = queue_index_publish([3,6,8], true, Qi4),
|
|
Qi6 = case IndexMod of
|
|
rabbit_queue_index -> IndexMod:deliver([2,3,5,6], Qi5);
|
|
_ -> Qi5
|
|
end,
|
|
{_DeletedSegments7, Qi7} = IndexMod:ack([1,2,3], Qi6),
|
|
{5, 50, Qi8} = restart_test_queue(Qi7, QName),
|
|
Qi8
|
|
end),
|
|
|
|
ok = rabbit_variable_queue:stop(?VHOST),
|
|
{ok, _} = rabbit_variable_queue:start(?VHOST, []),
|
|
|
|
passed.
|
|
|
|
%% The v2 index does not store the MsgId unless required.
|
|
%% We therefore do not check it.
|
|
verify_read_with_published_v2(_Persistent, [], _) ->
|
|
ok;
|
|
verify_read_with_published_v2(Persistent,
|
|
[{_MsgId1, SeqId, _Location, _Props, Persistent}|Read],
|
|
[{SeqId, _MsgId2}|Published]) ->
|
|
verify_read_with_published_v2(Persistent, Read, Published);
|
|
verify_read_with_published_v2(_Persistent, _Read, _Published) ->
|
|
ko.
|
|
|
|
bq_queue_index_props(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, bq_queue_index_props1, [Config]).
|
|
|
|
bq_queue_index_props1(_Config) ->
|
|
IndexMod = index_mod(),
|
|
|
|
with_empty_test_queue(
|
|
fun(Qi0, _QName) ->
|
|
MsgId = rabbit_guid:gen(),
|
|
Props = #message_properties{expiry=12345, size = 10},
|
|
Qi1 = IndexMod:publish(
|
|
MsgId, 0, memory, Props, true, infinity, Qi0),
|
|
{[{MsgId, 0, _, Props, _}], Qi2} =
|
|
IndexMod:read(0, 1, Qi1),
|
|
Qi2
|
|
end),
|
|
|
|
ok = rabbit_variable_queue:stop(?VHOST),
|
|
{ok, _} = rabbit_variable_queue:start(?VHOST, []),
|
|
|
|
passed.
|
|
|
|
v2_delete_segment_file_completely_acked(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, v2_delete_segment_file_completely_acked1, [Config]).
|
|
|
|
v2_delete_segment_file_completely_acked1(_Config) ->
|
|
IndexMod = rabbit_classic_queue_index_v2,
|
|
SegmentSize = IndexMod:next_segment_boundary(0),
|
|
SeqIds = lists:seq(0, SegmentSize - 1),
|
|
|
|
with_empty_test_queue(
|
|
fun (Qi0, _QName) ->
|
|
%% Publish a full segment file.
|
|
{Qi1, SeqIdsMsgIds} = queue_index_publish(SeqIds, true, Qi0),
|
|
SegmentSize = length(SeqIdsMsgIds),
|
|
{0, SegmentSize, Qi2} = IndexMod:bounds(Qi1, undefined),
|
|
%% Confirm that the file exists on disk.
|
|
Path = IndexMod:segment_file(0, Qi2),
|
|
true = filelib:is_file(Path),
|
|
%% Ack the full segment file.
|
|
{[0], Qi3} = IndexMod:ack(SeqIds, Qi2),
|
|
%% Confirm that the file was deleted.
|
|
false = filelib:is_file(Path),
|
|
Qi3
|
|
end),
|
|
|
|
passed.
|
|
|
|
v2_delete_segment_file_partially_acked(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, v2_delete_segment_file_partially_acked1, [Config]).
|
|
|
|
v2_delete_segment_file_partially_acked1(_Config) ->
|
|
IndexMod = rabbit_classic_queue_index_v2,
|
|
SegmentSize = IndexMod:next_segment_boundary(0),
|
|
SeqIds = lists:seq(0, SegmentSize div 2),
|
|
SeqIdsLen = length(SeqIds),
|
|
|
|
with_empty_test_queue(
|
|
fun (Qi0, _QName) ->
|
|
%% Publish a partial segment file.
|
|
{Qi1, SeqIdsMsgIds} = queue_index_publish(SeqIds, true, Qi0),
|
|
SeqIdsLen = length(SeqIdsMsgIds),
|
|
{0, SegmentSize, Qi2} = IndexMod:bounds(Qi1, undefined),
|
|
%% Confirm that the file exists on disk.
|
|
Path = IndexMod:segment_file(0, Qi2),
|
|
true = filelib:is_file(Path),
|
|
%% Ack the partial segment file.
|
|
{[0], Qi3} = IndexMod:ack(SeqIds, Qi2),
|
|
%% Confirm that the file was deleted.
|
|
false = filelib:is_file(Path),
|
|
Qi3
|
|
end),
|
|
|
|
passed.
|
|
|
|
v2_delete_segment_file_partially_acked_with_holes(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, v2_delete_segment_file_partially_acked_with_holes1, [Config]).
|
|
|
|
v2_delete_segment_file_partially_acked_with_holes1(_Config) ->
|
|
IndexMod = rabbit_classic_queue_index_v2,
|
|
SegmentSize = IndexMod:next_segment_boundary(0),
|
|
SeqIdsA = lists:seq(0, SegmentSize div 2),
|
|
SeqIdsB = lists:seq(11 + SegmentSize div 2, SegmentSize - 1),
|
|
SeqIdsLen = length(SeqIdsA) + length(SeqIdsB),
|
|
|
|
with_empty_test_queue(
|
|
fun (Qi0, _QName) ->
|
|
%% Publish a partial segment file with holes.
|
|
{Qi1, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, true, Qi0),
|
|
{Qi2, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi1),
|
|
SeqIdsLen = length(SeqIdsMsgIdsA) + length(SeqIdsMsgIdsB),
|
|
{0, SegmentSize, Qi3} = IndexMod:bounds(Qi2, undefined),
|
|
%% Confirm that the file exists on disk.
|
|
Path = IndexMod:segment_file(0, Qi3),
|
|
true = filelib:is_file(Path),
|
|
%% Ack the partial segment file with holes.
|
|
{[], Qi4} = IndexMod:ack(SeqIdsA, Qi3),
|
|
{[0], Qi5} = IndexMod:ack(SeqIdsB, Qi4),
|
|
%% Confirm that the file was deleted.
|
|
false = filelib:is_file(Path),
|
|
Qi5
|
|
end),
|
|
|
|
passed.
|
|
|
|
bq_variable_queue_delete_msg_store_files_callback(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, bq_variable_queue_delete_msg_store_files_callback1, [Config]).
|
|
|
|
bq_variable_queue_delete_msg_store_files_callback1(Config) ->
|
|
ok = restart_msg_store_empty(),
|
|
QName0 = queue_name(Config, <<"bq_variable_queue_delete_msg_store_files_callback-q">>),
|
|
{new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>),
|
|
QName = amqqueue:get_name(Q),
|
|
QPid = amqqueue:get_pid(Q),
|
|
Payload = <<0:8388608>>, %% 1MB
|
|
Count = 30,
|
|
QTState = publish_and_confirm(Q, Payload, Count),
|
|
|
|
{ok, Limiter} = rabbit_limiter:start_link(no_id),
|
|
|
|
CountMinusOne = Count - 1,
|
|
{ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}, _} =
|
|
rabbit_amqqueue:basic_get(Q, true, Limiter,
|
|
<<"bq_variable_queue_delete_msg_store_files_callback1">>,
|
|
QTState),
|
|
{ok, CountMinusOne} = rabbit_amqqueue:purge(Q),
|
|
|
|
%% give the queue a second to receive the close_fds callback msg
|
|
timer:sleep(1000),
|
|
|
|
rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>),
|
|
passed.
|
|
|
|
bq_queue_recover(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, bq_queue_recover1, [Config]).
|
|
|
|
bq_queue_recover1(Config) ->
|
|
init_queue_index(),
|
|
IndexMod = index_mod(),
|
|
Count = 2 * IndexMod:next_segment_boundary(0),
|
|
QName0 = queue_name(Config, <<"bq_queue_recover-q">>),
|
|
{new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>),
|
|
QName = amqqueue:get_name(Q),
|
|
QPid = amqqueue:get_pid(Q),
|
|
QT = publish_and_confirm(Q, <<>>, Count),
|
|
SupPid = get_queue_sup_pid(Q),
|
|
true = is_pid(SupPid),
|
|
exit(SupPid, kill),
|
|
exit(QPid, kill),
|
|
MRef = erlang:monitor(process, QPid),
|
|
receive {'DOWN', MRef, process, QPid, _Info} -> ok
|
|
after ?TIMEOUT -> exit(timeout_waiting_for_queue_death)
|
|
end,
|
|
rabbit_amqqueue:stop(?VHOST),
|
|
{Recovered, []} = rabbit_amqqueue:recover(?VHOST),
|
|
rabbit_amqqueue:start(Recovered),
|
|
{ok, Limiter} = rabbit_limiter:start_link(no_id),
|
|
rabbit_amqqueue:with_or_die(
|
|
QName,
|
|
fun (Q1) when ?is_amqqueue(Q1) ->
|
|
QPid1 = amqqueue:get_pid(Q1),
|
|
CountMinusOne = Count - 1,
|
|
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}, _} =
|
|
rabbit_amqqueue:basic_get(Q1, false, Limiter,
|
|
<<"bq_queue_recover1">>, QT),
|
|
exit(QPid1, shutdown),
|
|
VQ1 = variable_queue_init(Q, true),
|
|
{{_Msg1, true, _AckTag1}, VQ2} =
|
|
rabbit_variable_queue:fetch(true, VQ1),
|
|
CountMinusOne = rabbit_variable_queue:len(VQ2),
|
|
_VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
|
|
ok = rabbit_amqqueue:internal_delete(Q1, <<"acting-user">>)
|
|
end),
|
|
passed.
|
|
|
|
%% Return the PID of the given queue's supervisor.
|
|
get_queue_sup_pid(Q) when ?is_amqqueue(Q) ->
|
|
QName = amqqueue:get_name(Q),
|
|
QPid = amqqueue:get_pid(Q),
|
|
VHost = QName#resource.virtual_host,
|
|
{ok, AmqSup} = rabbit_amqqueue_sup_sup:find_for_vhost(VHost, node(QPid)),
|
|
Sups = supervisor:which_children(AmqSup),
|
|
get_queue_sup_pid(Sups, QPid).
|
|
|
|
get_queue_sup_pid([{_, SupPid, _, _} | Rest], QueuePid) ->
|
|
WorkerPids = [Pid || {_, Pid, _, _} <- supervisor:which_children(SupPid)],
|
|
case lists:member(QueuePid, WorkerPids) of
|
|
true -> SupPid;
|
|
false -> get_queue_sup_pid(Rest, QueuePid)
|
|
end;
|
|
get_queue_sup_pid([], _QueuePid) ->
|
|
undefined.
|
|
|
|
variable_queue_partial_segments_delta_thing(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, variable_queue_partial_segments_delta_thing1, [Config]).
|
|
|
|
variable_queue_partial_segments_delta_thing1(Config) ->
|
|
with_fresh_variable_queue(
|
|
fun variable_queue_partial_segments_delta_thing2/2,
|
|
?config(variable_queue_type, Config)).
|
|
|
|
variable_queue_partial_segments_delta_thing2(VQ0, _QName) ->
|
|
IndexMod = index_mod(),
|
|
SegmentSize = IndexMod:next_segment_boundary(0),
|
|
HalfSegment = SegmentSize div 2,
|
|
OneAndAHalfSegment = SegmentSize + HalfSegment,
|
|
VQ1 = variable_queue_publish(true, OneAndAHalfSegment, VQ0),
|
|
VQ2 = rabbit_variable_queue:update_rates(VQ1),
|
|
VQ3 = check_variable_queue_status(
|
|
VQ2,
|
|
%% We only have one message in memory because the amount in memory
|
|
%% depends on the consume rate, which is nil in this test.
|
|
[{delta, {delta, 1, OneAndAHalfSegment - 1, 0, OneAndAHalfSegment}},
|
|
{q3, 1},
|
|
{len, OneAndAHalfSegment}]),
|
|
VQ5 = check_variable_queue_status(
|
|
variable_queue_publish(true, 1, VQ3),
|
|
%% one alpha, but it's in the same segment as the deltas
|
|
%% @todo That's wrong now! v1/v2
|
|
[{delta, {delta, 1, OneAndAHalfSegment, 0, OneAndAHalfSegment + 1}},
|
|
{q3, 1},
|
|
{len, OneAndAHalfSegment + 1}]),
|
|
{VQ6, AckTags} = variable_queue_fetch(SegmentSize, true, false,
|
|
SegmentSize + HalfSegment + 1, VQ5),
|
|
VQ7 = check_variable_queue_status(
|
|
VQ6,
|
|
%% We only read from delta up to the end of the segment, so
|
|
%% after fetching exactly one segment, we should have no
|
|
%% messages in memory.
|
|
[{delta, {delta, SegmentSize, HalfSegment + 1, 0, OneAndAHalfSegment + 1}},
|
|
{q3, 0},
|
|
{len, HalfSegment + 1}]),
|
|
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
|
|
HalfSegment + 1, VQ7),
|
|
{_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
|
|
%% should be empty now
|
|
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
|
|
VQ10.
|
|
|
|
variable_queue_all_the_bits_not_covered_elsewhere_A(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, variable_queue_all_the_bits_not_covered_elsewhere_A1, [Config]).
|
|
|
|
variable_queue_all_the_bits_not_covered_elsewhere_A1(Config) ->
|
|
with_fresh_variable_queue(
|
|
fun variable_queue_all_the_bits_not_covered_elsewhere_A2/2,
|
|
?config(variable_queue_type, Config)).
|
|
|
|
variable_queue_all_the_bits_not_covered_elsewhere_A2(VQ0, QName) ->
|
|
IndexMod = index_mod(),
|
|
Count = 2 * IndexMod:next_segment_boundary(0),
|
|
VQ1 = variable_queue_publish(true, Count, VQ0),
|
|
VQ2 = variable_queue_publish(false, Count, VQ1),
|
|
{VQ4, _AckTags} = variable_queue_fetch(Count, true, false,
|
|
Count + Count, VQ2),
|
|
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
|
|
Count, VQ4),
|
|
_VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
|
|
VQ7 = variable_queue_init(test_amqqueue(QName, true), true),
|
|
{{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7),
|
|
Count1 = rabbit_variable_queue:len(VQ8),
|
|
VQ9 = variable_queue_publish(false, 1, VQ8),
|
|
{VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ9),
|
|
{VQ12, _AckTags3} = variable_queue_fetch(1, false, false, 1, VQ11),
|
|
VQ12.
|
|
|
|
variable_queue_all_the_bits_not_covered_elsewhere_B(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, variable_queue_all_the_bits_not_covered_elsewhere_B1, [Config]).
|
|
|
|
variable_queue_all_the_bits_not_covered_elsewhere_B1(Config) ->
|
|
with_fresh_variable_queue(
|
|
fun variable_queue_all_the_bits_not_covered_elsewhere_B2/2,
|
|
?config(variable_queue_type, Config)).
|
|
|
|
variable_queue_all_the_bits_not_covered_elsewhere_B2(VQ1, QName) ->
|
|
VQ2 = variable_queue_publish(false, 4, VQ1),
|
|
{VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2),
|
|
{_Guids, VQ4} =
|
|
rabbit_variable_queue:requeue(AckTags, VQ3),
|
|
VQ5 = rabbit_variable_queue:timeout(VQ4),
|
|
_VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
|
|
VQ7 = variable_queue_init(test_amqqueue(QName, true), true),
|
|
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
|
|
VQ8.
|
|
|
|
variable_queue_drop(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, variable_queue_drop1, [Config]).
|
|
|
|
variable_queue_drop1(Config) ->
|
|
with_fresh_variable_queue(
|
|
fun variable_queue_drop2/2,
|
|
?config(variable_queue_type, Config)).
|
|
|
|
variable_queue_drop2(VQ0, _QName) ->
|
|
%% start by sending a messages
|
|
VQ1 = variable_queue_publish(false, 1, VQ0),
|
|
%% drop message with AckRequired = true
|
|
{{MsgId, AckTag}, VQ2} = rabbit_variable_queue:drop(true, VQ1),
|
|
true = rabbit_variable_queue:is_empty(VQ2),
|
|
true = AckTag =/= undefinded,
|
|
%% drop again -> empty
|
|
{empty, VQ3} = rabbit_variable_queue:drop(false, VQ2),
|
|
%% requeue
|
|
{[MsgId], VQ4} = rabbit_variable_queue:requeue([AckTag], VQ3),
|
|
%% drop message with AckRequired = false
|
|
{{MsgId, undefined}, VQ5} = rabbit_variable_queue:drop(false, VQ4),
|
|
true = rabbit_variable_queue:is_empty(VQ5),
|
|
VQ5.
|
|
|
|
variable_queue_fold_msg_on_disk(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, variable_queue_fold_msg_on_disk1, [Config]).
|
|
|
|
variable_queue_fold_msg_on_disk1(Config) ->
|
|
with_fresh_variable_queue(
|
|
fun variable_queue_fold_msg_on_disk2/2,
|
|
?config(variable_queue_type, Config)).
|
|
|
|
variable_queue_fold_msg_on_disk2(VQ0, _QName) ->
|
|
VQ1 = variable_queue_publish(true, 1, VQ0),
|
|
{VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1),
|
|
{ok, VQ3} = rabbit_variable_queue:ackfold(fun (_M, _A, ok) -> ok end,
|
|
ok, VQ2, AckTags),
|
|
VQ3.
|
|
|
|
variable_queue_dropfetchwhile(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, variable_queue_dropfetchwhile1, [Config]).
|
|
|
|
variable_queue_dropfetchwhile1(Config) ->
|
|
with_fresh_variable_queue(
|
|
fun variable_queue_dropfetchwhile2/2,
|
|
?config(variable_queue_type, Config)).
|
|
|
|
variable_queue_dropfetchwhile2(VQ0, _QName) ->
|
|
Count = 10,
|
|
|
|
%% add messages with sequential expiry
|
|
VQ1 = variable_queue_publish(
|
|
false, 1, Count,
|
|
fun (N, Props) -> Props#message_properties{expiry = N} end,
|
|
fun erlang:term_to_binary/1, VQ0),
|
|
|
|
%% fetch the first 5 messages
|
|
{#message_properties{expiry = 6}, {Msgs, AckTags}, VQ2} =
|
|
rabbit_variable_queue:fetchwhile(
|
|
fun (#message_properties{expiry = Expiry}) -> Expiry =< 5 end,
|
|
fun (Msg, AckTag, {MsgAcc, AckAcc}) ->
|
|
{[Msg | MsgAcc], [AckTag | AckAcc]}
|
|
end, {[], []}, VQ1),
|
|
true = lists:seq(1, 5) == [msg2int(M) || M <- lists:reverse(Msgs)],
|
|
|
|
%% requeue them
|
|
{_MsgIds, VQ3} = rabbit_variable_queue:requeue(AckTags, VQ2),
|
|
|
|
%% drop the first 5 messages
|
|
{#message_properties{expiry = 6}, VQ4} =
|
|
rabbit_variable_queue:dropwhile(
|
|
fun (#message_properties {expiry = Expiry}) -> Expiry =< 5 end, VQ3),
|
|
|
|
%% fetch 5
|
|
VQ5 = lists:foldl(fun (N, VQN) ->
|
|
{{Msg, _, _}, VQM} =
|
|
rabbit_variable_queue:fetch(false, VQN),
|
|
true = msg2int(Msg) == N,
|
|
VQM
|
|
end, VQ4, lists:seq(6, Count)),
|
|
|
|
%% should be empty now
|
|
true = rabbit_variable_queue:is_empty(VQ5),
|
|
|
|
VQ5.
|
|
|
|
variable_queue_dropwhile_restart(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, variable_queue_dropwhile_restart1, [Config]).
|
|
|
|
variable_queue_dropwhile_restart1(Config) ->
|
|
with_fresh_variable_queue(
|
|
fun variable_queue_dropwhile_restart2/2,
|
|
?config(variable_queue_type, Config)).
|
|
|
|
variable_queue_dropwhile_restart2(VQ0, QName) ->
|
|
Count = 10000,
|
|
|
|
%% add messages with sequential expiry
|
|
VQ1 = variable_queue_publish(
|
|
true, 1, Count,
|
|
fun (N, Props) -> Props#message_properties{expiry = N} end,
|
|
fun erlang:term_to_binary/1, VQ0),
|
|
|
|
%% drop the first 5 messages
|
|
{#message_properties{expiry = 6}, VQ2} =
|
|
rabbit_variable_queue:dropwhile(
|
|
fun (#message_properties {expiry = Expiry}) -> Expiry =< 5 end, VQ1),
|
|
|
|
_VQ3 = rabbit_variable_queue:terminate(shutdown, VQ2),
|
|
Terms = variable_queue_read_terms(QName),
|
|
VQ4 = variable_queue_init(test_amqqueue(QName, true), Terms),
|
|
|
|
%% fetch 5
|
|
VQ5 = lists:foldl(fun (_, VQN) ->
|
|
{{_, _, _}, VQM} =
|
|
rabbit_variable_queue:fetch(false, VQN),
|
|
VQM
|
|
end, VQ4, lists:seq(6, Count)),
|
|
|
|
%% should be empty now
|
|
true = rabbit_variable_queue:is_empty(VQ5),
|
|
|
|
VQ5.
|
|
|
|
variable_queue_dropwhile_sync_restart(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, variable_queue_dropwhile_sync_restart1, [Config]).
|
|
|
|
variable_queue_dropwhile_sync_restart1(Config) ->
|
|
with_fresh_variable_queue(
|
|
fun variable_queue_dropwhile_sync_restart2/2,
|
|
?config(variable_queue_type, Config)).
|
|
|
|
variable_queue_dropwhile_sync_restart2(VQ0, QName) ->
|
|
Count = 10000,
|
|
|
|
%% add messages with sequential expiry
|
|
VQ1 = variable_queue_publish(
|
|
true, 1, Count,
|
|
fun (N, Props) -> Props#message_properties{expiry = N} end,
|
|
fun erlang:term_to_binary/1, VQ0),
|
|
|
|
%% drop the first 5 messages
|
|
{#message_properties{expiry = 6}, VQ2} =
|
|
rabbit_variable_queue:dropwhile(
|
|
fun (#message_properties {expiry = Expiry}) -> Expiry =< 5 end, VQ1),
|
|
|
|
%% Queue index sync.
|
|
VQ2b = rabbit_variable_queue:handle_pre_hibernate(VQ2),
|
|
|
|
_VQ3 = rabbit_variable_queue:terminate(shutdown, VQ2b),
|
|
Terms = variable_queue_read_terms(QName),
|
|
VQ4 = variable_queue_init(test_amqqueue(QName, true), Terms),
|
|
|
|
%% fetch 5
|
|
VQ5 = lists:foldl(fun (_, VQN) ->
|
|
{{_, _, _}, VQM} =
|
|
rabbit_variable_queue:fetch(false, VQN),
|
|
VQM
|
|
end, VQ4, lists:seq(6, Count)),
|
|
|
|
%% should be empty now
|
|
true = rabbit_variable_queue:is_empty(VQ5),
|
|
|
|
VQ5.
|
|
|
|
variable_queue_restart_large_seq_id(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, variable_queue_restart_large_seq_id1, [Config]).
|
|
|
|
variable_queue_restart_large_seq_id1(Config) ->
|
|
with_fresh_variable_queue(
|
|
fun variable_queue_restart_large_seq_id2/2,
|
|
?config(variable_queue_type, Config)).
|
|
|
|
variable_queue_restart_large_seq_id2(VQ0, QName) ->
|
|
Count = 1,
|
|
|
|
%% publish and consume a message
|
|
VQ1 = publish_fetch_and_ack(Count, 0, VQ0),
|
|
%% should be empty now
|
|
true = rabbit_variable_queue:is_empty(VQ1),
|
|
|
|
_VQ2 = rabbit_variable_queue:terminate(shutdown, VQ1),
|
|
Terms = variable_queue_read_terms(QName),
|
|
Count = proplists:get_value(next_seq_id, Terms),
|
|
|
|
%% set a very high next_seq_id as if 100M messages have been
|
|
%% published and consumed
|
|
Terms2 = lists:keyreplace(next_seq_id, 1, Terms, {next_seq_id, 100_000_000}),
|
|
|
|
{TInit, VQ3} =
|
|
timer:tc(
|
|
fun() -> variable_queue_init(test_amqqueue(QName, true), Terms2) end,
|
|
millisecond),
|
|
%% even with a very high next_seq_id start of an empty queue
|
|
%% should be quick (few milliseconds, but let's give it 100ms, to
|
|
%% avoid flaking on slow servers)
|
|
{true, _} = {TInit < 100, TInit},
|
|
|
|
%% should be empty now
|
|
true = rabbit_variable_queue:is_empty(VQ3),
|
|
|
|
VQ3.
|
|
|
|
variable_queue_ack_limiting(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, variable_queue_ack_limiting1, [Config]).
|
|
|
|
variable_queue_ack_limiting1(Config) ->
|
|
with_fresh_variable_queue(
|
|
fun variable_queue_ack_limiting2/2,
|
|
?config(variable_queue_type, Config)).
|
|
|
|
variable_queue_ack_limiting2(VQ0, _Config) ->
|
|
%% start by sending in a bunch of messages
|
|
Len = 1024,
|
|
VQ1 = variable_queue_publish(false, Len, VQ0),
|
|
|
|
%% squeeze and relax queue
|
|
Churn = Len div 32,
|
|
VQ2 = publish_fetch_and_ack(Churn, Len, VQ1),
|
|
|
|
%% update stats
|
|
VQ3 = rabbit_variable_queue:update_rates(VQ2),
|
|
|
|
%% fetch half the messages
|
|
{VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3),
|
|
|
|
%% We only check the length anymore because
|
|
%% that's the only predictable stats we got.
|
|
VQ5 = check_variable_queue_status(VQ4, [{len, Len div 2}]),
|
|
|
|
VQ5.
|
|
|
|
variable_queue_purge(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, variable_queue_purge1, [Config]).
|
|
|
|
variable_queue_purge1(Config) ->
|
|
with_fresh_variable_queue(
|
|
fun variable_queue_purge2/2,
|
|
?config(variable_queue_type, Config)).
|
|
|
|
variable_queue_purge2(VQ0, _Config) ->
|
|
LenDepth = fun (VQ) ->
|
|
{rabbit_variable_queue:len(VQ),
|
|
rabbit_variable_queue:depth(VQ)}
|
|
end,
|
|
VQ1 = variable_queue_publish(false, 10, VQ0),
|
|
{VQ2, Acks} = variable_queue_fetch(6, false, false, 10, VQ1),
|
|
{4, VQ3} = rabbit_variable_queue:purge(VQ2),
|
|
{0, 6} = LenDepth(VQ3),
|
|
{_, VQ4} = rabbit_variable_queue:requeue(lists:sublist(Acks, 2), VQ3),
|
|
{2, 6} = LenDepth(VQ4),
|
|
VQ5 = rabbit_variable_queue:purge_acks(VQ4),
|
|
{2, 2} = LenDepth(VQ5),
|
|
VQ5.
|
|
|
|
variable_queue_requeue(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, variable_queue_requeue1, [Config]).
|
|
|
|
variable_queue_requeue1(Config) ->
|
|
with_fresh_variable_queue(
|
|
fun variable_queue_requeue2/2,
|
|
?config(variable_queue_type, Config)).
|
|
|
|
variable_queue_requeue2(VQ0, _Config) ->
|
|
{_PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
|
|
variable_queue_with_holes(VQ0),
|
|
Msgs =
|
|
lists:zip(RequeuedMsgs,
|
|
lists:duplicate(length(RequeuedMsgs), true)) ++
|
|
lists:zip(FreshMsgs,
|
|
lists:duplicate(length(FreshMsgs), false)),
|
|
VQ2 = lists:foldl(fun ({I, Requeued}, VQa) ->
|
|
{{M, MRequeued, _}, VQb} =
|
|
rabbit_variable_queue:fetch(true, VQa),
|
|
Requeued = MRequeued, %% assertion
|
|
I = msg2int(M), %% assertion
|
|
VQb
|
|
end, VQ1, Msgs),
|
|
{empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2),
|
|
VQ3.
|
|
|
|
%% requeue from ram_pending_ack into q3, move to delta and then empty queue
|
|
variable_queue_requeue_ram_beta(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, variable_queue_requeue_ram_beta1, [Config]).
|
|
|
|
variable_queue_requeue_ram_beta1(Config) ->
|
|
with_fresh_variable_queue(
|
|
fun variable_queue_requeue_ram_beta2/2,
|
|
?config(variable_queue_type, Config)).
|
|
|
|
variable_queue_requeue_ram_beta2(VQ0, _Config) ->
|
|
IndexMod = index_mod(),
|
|
Count = IndexMod:next_segment_boundary(0)*2 + 2,
|
|
VQ1 = variable_queue_publish(false, Count, VQ0),
|
|
{VQ2, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ1),
|
|
{Back, Front} = lists:split(Count div 2, AcksR),
|
|
{_, VQ3} = rabbit_variable_queue:requeue(erlang:tl(Back), VQ2),
|
|
{_, VQ5} = rabbit_variable_queue:requeue([erlang:hd(Back)], VQ3),
|
|
VQ6 = requeue_one_by_one(Front, VQ5),
|
|
{VQ7, AcksAll} = variable_queue_fetch(Count, false, true, Count, VQ6),
|
|
{_, VQ8} = rabbit_variable_queue:ack(AcksAll, VQ7),
|
|
VQ8.
|
|
|
|
variable_queue_fold(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, variable_queue_fold1, [Config]).
|
|
|
|
variable_queue_fold1(Config) ->
|
|
with_fresh_variable_queue(
|
|
fun variable_queue_fold2/2,
|
|
?config(variable_queue_type, Config)).
|
|
|
|
variable_queue_fold2(VQ0, _Config) ->
|
|
{PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
|
|
variable_queue_with_holes(VQ0),
|
|
Count = rabbit_variable_queue:depth(VQ1),
|
|
Msgs = lists:sort(PendingMsgs ++ RequeuedMsgs ++ FreshMsgs),
|
|
lists:foldl(fun (Cut, VQ2) ->
|
|
test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ2)
|
|
end, VQ1, [0, 1, 2, Count div 2,
|
|
Count - 1, Count, Count + 1, Count * 2]).
|
|
|
|
test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ0) ->
|
|
{Acc, VQ1} = rabbit_variable_queue:fold(
|
|
fun (M, _, Pending, A) ->
|
|
MInt = msg2int(M),
|
|
Pending = lists:member(MInt, PendingMsgs), %% assert
|
|
case MInt =< Cut of
|
|
true -> {cont, [MInt | A]};
|
|
false -> {stop, A}
|
|
end
|
|
end, [], VQ0),
|
|
Expected = lists:takewhile(fun (I) -> I =< Cut end, Msgs),
|
|
Expected = lists:reverse(Acc), %% assertion
|
|
VQ1.
|
|
|
|
%% same as test_variable_queue_requeue_ram_beta but randomly changing
|
|
%% the queue mode after every step.
|
|
variable_queue_mode_change(Config) ->
|
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
?MODULE, variable_queue_mode_change1, [Config]).
|
|
|
|
variable_queue_mode_change1(Config) ->
|
|
with_fresh_variable_queue(
|
|
fun variable_queue_mode_change2/2,
|
|
?config(variable_queue_type, Config)).
|
|
|
|
variable_queue_mode_change2(VQ0, _Config) ->
|
|
IndexMod = index_mod(),
|
|
Count = IndexMod:next_segment_boundary(0)*2 + 2,
|
|
VQ1 = variable_queue_publish(false, Count, VQ0),
|
|
VQ2 = maybe_switch_queue_mode(VQ1),
|
|
{VQ3, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ2),
|
|
VQ4 = maybe_switch_queue_mode(VQ3),
|
|
{Back, Front} = lists:split(Count div 2, AcksR),
|
|
{_, VQ5} = rabbit_variable_queue:requeue(erlang:tl(Back), VQ4),
|
|
VQ6 = maybe_switch_queue_mode(VQ5),
|
|
VQ8 = maybe_switch_queue_mode(VQ6),
|
|
{_, VQ9} = rabbit_variable_queue:requeue([erlang:hd(Back)], VQ8),
|
|
VQ10 = maybe_switch_queue_mode(VQ9),
|
|
VQ11 = requeue_one_by_one(Front, VQ10),
|
|
VQ12 = maybe_switch_queue_mode(VQ11),
|
|
{VQ13, AcksAll} = variable_queue_fetch(Count, false, true, Count, VQ12),
|
|
VQ14 = maybe_switch_queue_mode(VQ13),
|
|
{_, VQ15} = rabbit_variable_queue:ack(AcksAll, VQ14),
|
|
VQ16 = maybe_switch_queue_mode(VQ15),
|
|
VQ16.
|
|
|
|
maybe_switch_queue_mode(VQ) ->
|
|
Mode = random_queue_mode(),
|
|
set_queue_mode(Mode, VQ).
|
|
|
|
random_queue_mode() ->
|
|
Modes = [lazy, default],
|
|
lists:nth(rand:uniform(length(Modes)), Modes).
|
|
|
|
pub_res({_, VQS}) ->
|
|
VQS;
|
|
pub_res(VQS) ->
|
|
VQS.
|
|
|
|
make_publish(IsPersistent, PayloadFun, PropFun, N) ->
|
|
{message(IsPersistent, PayloadFun, N),
|
|
PropFun(N, #message_properties{size = 10}),
|
|
false}.
|
|
|
|
make_publish_delivered(IsPersistent, PayloadFun, PropFun, N) ->
|
|
{message(IsPersistent, PayloadFun, N),
|
|
PropFun(N, #message_properties{size = 10})}.
|
|
|
|
queue_name(Config, Name) ->
|
|
Name1 = iolist_to_binary(rabbit_ct_helpers:config_to_testcase_name(Config, Name)),
|
|
queue_name(Name1).
|
|
|
|
queue_name(Name) ->
|
|
rabbit_misc:r(<<"/">>, queue, Name).
|
|
|
|
test_queue() ->
|
|
queue_name(rabbit_guid:gen()).
|
|
|
|
init_test_queue(QName) ->
|
|
PRef = rabbit_guid:gen(),
|
|
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef),
|
|
IndexMod = index_mod(),
|
|
Res = IndexMod:recover(
|
|
QName, [], false,
|
|
fun (MsgId) ->
|
|
rabbit_msg_store:contains(MsgId, PersistentClient)
|
|
end,
|
|
fun nop/1, fun nop/1,
|
|
main),
|
|
ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient),
|
|
Res.
|
|
|
|
restart_test_queue(Qi, QName) ->
|
|
IndexMod = index_mod(),
|
|
_ = IndexMod:terminate(?VHOST, [], Qi),
|
|
ok = rabbit_variable_queue:stop(?VHOST),
|
|
{ok, _} = rabbit_variable_queue:start(?VHOST, [QName]),
|
|
init_test_queue(QName).
|
|
|
|
empty_test_queue(QName) ->
|
|
ok = rabbit_variable_queue:stop(?VHOST),
|
|
{ok, _} = rabbit_variable_queue:start(?VHOST, []),
|
|
{0, 0, Qi} = init_test_queue(QName),
|
|
IndexMod = index_mod(),
|
|
_ = IndexMod:delete_and_terminate(Qi),
|
|
ok.
|
|
|
|
unin_empty_test_queue(QName) ->
|
|
{0, 0, Qi} = init_test_queue(QName),
|
|
IndexMod = index_mod(),
|
|
_ = IndexMod:delete_and_terminate(Qi),
|
|
ok.
|
|
|
|
with_empty_test_queue(Fun) ->
|
|
QName = test_queue(),
|
|
ok = empty_test_queue(QName),
|
|
{0, 0, Qi} = init_test_queue(QName),
|
|
IndexMod = index_mod(),
|
|
IndexMod:delete_and_terminate(Fun(Qi, QName)).
|
|
|
|
init_queue_index() ->
|
|
%% We must set the segment entry count in the process dictionary
|
|
%% for tests that call the v1 queue index directly to have a correct
|
|
%% value.
|
|
put(segment_entry_count, 2048),
|
|
ok.
|
|
|
|
restart_app() ->
|
|
rabbit:stop(),
|
|
rabbit:start().
|
|
|
|
queue_index_publish(SeqIds, Persistent, Qi) ->
|
|
IndexMod = index_mod(),
|
|
Ref = rabbit_guid:gen(),
|
|
MsgStore = case Persistent of
|
|
true -> ?PERSISTENT_MSG_STORE;
|
|
false -> ?TRANSIENT_MSG_STORE
|
|
end,
|
|
MSCState = msg_store_client_init(MsgStore, Ref),
|
|
{A, B = [{_SeqId, LastMsgIdWritten} | _]} =
|
|
lists:foldl(
|
|
fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) ->
|
|
MsgId = rabbit_guid:gen(),
|
|
QiM = IndexMod:publish(
|
|
MsgId, SeqId, rabbit_msg_store,
|
|
#message_properties{size = 10},
|
|
Persistent, infinity, QiN),
|
|
ok = rabbit_msg_store:write(SeqId, MsgId, MsgId, MSCState),
|
|
{QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]}
|
|
end, {Qi, []}, SeqIds),
|
|
%% do this just to force all of the publishes through to the msg_store:
|
|
true = rabbit_msg_store:contains(LastMsgIdWritten, MSCState),
|
|
ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
|
|
{A, B}.
|
|
|
|
nop(_) -> ok.
|
|
nop(_, _) -> ok.
|
|
|
|
msg_store_client_init(MsgStore, Ref) ->
|
|
rabbit_vhost_msg_store:client_init(?VHOST, MsgStore, Ref, undefined).
|
|
|
|
variable_queue_init(Q, Recover) ->
|
|
rabbit_variable_queue:init(
|
|
Q, case Recover of
|
|
true -> non_clean_shutdown;
|
|
false -> new;
|
|
Terms -> Terms
|
|
end, fun nop/2, fun nop/1, fun nop/1).
|
|
|
|
variable_queue_read_terms(QName) ->
|
|
#resource { kind = queue,
|
|
virtual_host = VHost,
|
|
name = Name } = QName,
|
|
<<Num:128>> = erlang:md5(<<"queue", VHost/binary, Name/binary>>),
|
|
DirName = rabbit_misc:format("~.36B", [Num]),
|
|
{ok, Terms} = rabbit_recovery_terms:read(VHost, DirName),
|
|
Terms.
|
|
|
|
publish_and_confirm(Q, Payload, Count) ->
|
|
Seqs = lists:seq(1, Count),
|
|
QTState0 = rabbit_queue_type:new(Q, rabbit_queue_type:init()),
|
|
QTState =
|
|
lists:foldl(
|
|
fun (Seq, Acc0) ->
|
|
BMsg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
|
|
<<>>, #'P_basic'{delivery_mode = 2},
|
|
Payload),
|
|
Content = BMsg#basic_message.content,
|
|
Ex = BMsg#basic_message.exchange_name,
|
|
{ok, Msg} = mc_amqpl:message(Ex, <<>>, Content),
|
|
Options = #{correlation => Seq},
|
|
{ok, Acc, _Actions} = rabbit_queue_type:deliver([Q], Msg,
|
|
Options, Acc0),
|
|
Acc
|
|
end, QTState0, Seqs),
|
|
wait_for_confirms(sets:from_list(Seqs, [{version, 2}])),
|
|
QTState.
|
|
|
|
wait_for_confirms(Unconfirmed) ->
|
|
case sets:is_empty(Unconfirmed) of
|
|
true -> ok;
|
|
false ->
|
|
receive
|
|
{'$gen_cast', {queue_event, _QName, {confirm, Confirmed, _}}} ->
|
|
wait_for_confirms(
|
|
sets:subtract(
|
|
Unconfirmed, sets:from_list(Confirmed, [{version, 2}])))
|
|
after ?TIMEOUT ->
|
|
flush(),
|
|
exit(timeout_waiting_for_confirm)
|
|
end
|
|
end.
|
|
|
|
with_fresh_variable_queue(Fun, Mode) ->
|
|
Ref = make_ref(),
|
|
Me = self(),
|
|
%% Run in a separate process since rabbit_msg_store will send
|
|
%% bump_credit messages and we want to ignore them
|
|
spawn_link(fun() ->
|
|
QName = test_queue(),
|
|
ok = unin_empty_test_queue(QName),
|
|
VQ = variable_queue_init(test_amqqueue(QName, true), false),
|
|
S0 = variable_queue_status(VQ),
|
|
assert_props(S0, [{q1, 0}, {q2, 0},
|
|
{delta,
|
|
{delta, undefined, 0, 0, undefined}},
|
|
{q3, 0}, {q4, 0},
|
|
{len, 0}]),
|
|
VQ1 = set_queue_mode(Mode, VQ),
|
|
try
|
|
_ = rabbit_variable_queue:delete_and_terminate(
|
|
shutdown, Fun(VQ1, QName)),
|
|
Me ! Ref
|
|
catch
|
|
Type:Error:Stacktrace ->
|
|
Me ! {Ref, Type, Error, Stacktrace}
|
|
end
|
|
end),
|
|
receive
|
|
Ref -> ok;
|
|
{Ref, Type, Error, ST} -> exit({Type, Error, ST})
|
|
end,
|
|
passed.
|
|
|
|
set_queue_mode(Mode, VQ) ->
|
|
rabbit_variable_queue:set_queue_mode(Mode, VQ).
|
|
|
|
variable_queue_publish(IsPersistent, Count, VQ) ->
|
|
variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).
|
|
|
|
variable_queue_publish(IsPersistent, Count, PropFun, VQ) ->
|
|
variable_queue_publish(IsPersistent, 1, Count, PropFun,
|
|
fun (_N) -> <<>> end, VQ).
|
|
|
|
variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) ->
|
|
variable_queue_wait_for_shuffling_end(
|
|
lists:foldl(
|
|
fun (N, VQN) ->
|
|
Msg = message(IsPersistent, PayloadFun, N),
|
|
rabbit_variable_queue:publish(
|
|
Msg,
|
|
PropFun(N, #message_properties{size = 10}),
|
|
false, self(), VQN)
|
|
end, VQ, lists:seq(Start, Start + Count - 1))).
|
|
|
|
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
|
|
lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
|
|
Rem = Len - N,
|
|
{{Msg, IsDelivered, AckTagN}, VQM} =
|
|
rabbit_variable_queue:fetch(true, VQN),
|
|
IsPersistent = mc:is_persistent(Msg),
|
|
Rem = rabbit_variable_queue:len(VQM),
|
|
{VQM, [AckTagN | AckTagsAcc]}
|
|
end, {VQ, []}, lists:seq(1, Count)).
|
|
|
|
test_amqqueue(QName, Durable) ->
|
|
rabbit_amqqueue:pseudo_queue(QName, self(), Durable).
|
|
|
|
assert_prop(List, Prop, Value) ->
|
|
case proplists:get_value(Prop, List)of
|
|
Value -> ok;
|
|
_ -> {exit, Prop, exp, Value, List}
|
|
end.
|
|
|
|
assert_props(List, PropVals) ->
|
|
Res = [assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals],
|
|
case lists:usort(Res) of
|
|
[ok] -> ok;
|
|
Error -> error(Error -- [ok])
|
|
end.
|
|
|
|
publish_fetch_and_ack(0, _Len, VQ0) ->
|
|
VQ0;
|
|
publish_fetch_and_ack(N, Len, VQ0) ->
|
|
VQ1 = variable_queue_publish(false, 1, VQ0),
|
|
{{_Msg, false, AckTag}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
|
|
Len = rabbit_variable_queue:len(VQ2),
|
|
{_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
|
|
publish_fetch_and_ack(N-1, Len, VQ3).
|
|
|
|
variable_queue_status(VQ) ->
|
|
Keys = rabbit_backing_queue:info_keys() -- [backing_queue_status],
|
|
[{K, rabbit_variable_queue:info(K, VQ)} || K <- Keys] ++
|
|
rabbit_variable_queue:info(backing_queue_status, VQ).
|
|
|
|
variable_queue_wait_for_shuffling_end(VQ) ->
|
|
case credit_flow:blocked() of
|
|
false -> VQ;
|
|
true ->
|
|
receive
|
|
{bump_credit, Msg} ->
|
|
credit_flow:handle_bump_msg(Msg),
|
|
variable_queue_wait_for_shuffling_end(
|
|
rabbit_variable_queue:resume(VQ))
|
|
end
|
|
end.
|
|
|
|
msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) ->
|
|
binary_to_term(list_to_binary(lists:reverse(P)));
|
|
msg2int(Msg) ->
|
|
#content{payload_fragments_rev = P} = mc:protocol_state(Msg),
|
|
binary_to_term(list_to_binary(lists:reverse(P))).
|
|
|
|
ack_subset(AckSeqs, Interval, Rem) ->
|
|
lists:filter(fun ({_Ack, N}) -> (N + Rem) rem Interval == 0 end, AckSeqs).
|
|
|
|
requeue_one_by_one(Acks, VQ) ->
|
|
lists:foldl(fun (AckTag, VQN) ->
|
|
{_MsgId, VQM} = rabbit_variable_queue:requeue(
|
|
[AckTag], VQN),
|
|
VQM
|
|
end, VQ, Acks).
|
|
|
|
%% Create a vq with messages in q1, delta, and q3, and holes (in the
|
|
%% form of pending acks) in the latter two.
|
|
variable_queue_with_holes(VQ0) ->
|
|
Interval = 2048, %% should match vq:IO_BATCH_SIZE
|
|
IndexMod = index_mod(),
|
|
Count = IndexMod:next_segment_boundary(0)*2 + 2 * Interval,
|
|
Seq = lists:seq(1, Count),
|
|
VQ1 = variable_queue_publish(
|
|
false, 1, Count,
|
|
fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ0),
|
|
{VQ3, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ1),
|
|
Acks = lists:reverse(AcksR),
|
|
AckSeqs = lists:zip(Acks, Seq),
|
|
[{Subset1, _Seq1}, {Subset2, _Seq2}, {Subset3, Seq3}] =
|
|
[lists:unzip(ack_subset(AckSeqs, Interval, I)) || I <- [0, 1, 2]],
|
|
%% we requeue in three phases in order to exercise requeuing logic
|
|
%% in various vq states
|
|
{_MsgIds, VQ4} = rabbit_variable_queue:requeue(
|
|
Acks -- (Subset1 ++ Subset2 ++ Subset3), VQ3),
|
|
VQ5 = requeue_one_by_one(Subset1, VQ4),
|
|
%% by now we have some messages (and holes) in delta
|
|
VQ6 = requeue_one_by_one(Subset2, VQ5),
|
|
%% add the q1 tail
|
|
VQ8 = variable_queue_publish(
|
|
true, Count + 1, Interval,
|
|
fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ6),
|
|
%% assertions
|
|
vq_with_holes_assertions(VQ8),
|
|
Depth = Count + Interval,
|
|
Depth = rabbit_variable_queue:depth(VQ8),
|
|
Len = Depth - length(Subset3),
|
|
Len = rabbit_variable_queue:len(VQ8),
|
|
|
|
{Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + Interval), VQ8}.
|
|
|
|
vq_with_holes_assertions(VQ) ->
|
|
[false =
|
|
case V of
|
|
{delta, _, 0, _, _} -> true;
|
|
0 -> true;
|
|
_ -> false
|
|
end || {K, V} <- variable_queue_status(VQ),
|
|
lists:member(K, [delta, q3])].
|
|
|
|
check_variable_queue_status(VQ0, Props) ->
|
|
VQ1 = variable_queue_wait_for_shuffling_end(VQ0),
|
|
S = variable_queue_status(VQ1),
|
|
assert_props(S, Props),
|
|
VQ1.
|
|
|
|
flush() ->
|
|
receive
|
|
Any ->
|
|
ct:pal("flush ~tp", [Any]),
|
|
flush()
|
|
after 0 ->
|
|
ok
|
|
end.
|
|
|
|
message(IsPersistent, PayloadFun, N) ->
|
|
#basic_message{content = Content,
|
|
exchange_name = Ex,
|
|
id = Id} =
|
|
rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
|
|
<<>>, #'P_basic'{delivery_mode = case IsPersistent of
|
|
true -> 2;
|
|
false -> 1
|
|
end},
|
|
PayloadFun(N)),
|
|
{ok, Msg} = mc_amqpl:message(Ex, <<>>, Content, #{id => Id}),
|
|
Msg.
|