346 lines
12 KiB
Erlang
346 lines
12 KiB
Erlang
%% This Source Code Form is subject to the terms of the Mozilla Public
|
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
%%
|
|
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
|
%%
|
|
|
|
%% @todo This module also needs to be updated when variable queue changes.
|
|
-module(channel_operation_timeout_test_queue).
|
|
|
|
-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
|
|
purge/1, purge_acks/1,
|
|
publish/5, publish_delivered/4,
|
|
discard/3, drain_confirmed/1,
|
|
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
|
|
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
|
|
update_rates/1, needs_timeout/1, timeout/1,
|
|
handle_pre_hibernate/1, resume/1, msg_rates/1,
|
|
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
|
|
set_queue_version/2,
|
|
start/2, stop/1, zip_msgs_and_acks/4, handle_info/2]).
|
|
|
|
%%----------------------------------------------------------------------------
|
|
%% This test backing queue follows the variable queue implementation, with
|
|
%% the exception that it will introduce infinite delays on some operations if
|
|
%% the test message has been published, and is awaiting acknowledgement in the
|
|
%% queue index. Test message is "timeout_test_msg!".
|
|
%%
|
|
%%----------------------------------------------------------------------------
|
|
|
|
-behaviour(rabbit_backing_queue).
|
|
|
|
-record(vqstate,
|
|
{ q1,
|
|
q2,
|
|
delta,
|
|
q3,
|
|
q4,
|
|
next_seq_id,
|
|
%% seq_id() of first undelivered message
|
|
%% everything before this seq_id() was delivered at least once
|
|
next_deliver_seq_id,
|
|
ram_pending_ack, %% msgs using store, still in RAM
|
|
disk_pending_ack, %% msgs in store, paged out
|
|
qi_pending_ack, %% msgs using qi, *can't* be paged out
|
|
index_mod,
|
|
index_state,
|
|
store_state,
|
|
msg_store_clients,
|
|
durable,
|
|
transient_threshold,
|
|
qi_embed_msgs_below,
|
|
|
|
len, %% w/o unacked
|
|
bytes, %% w/o unacked
|
|
unacked_bytes,
|
|
persistent_count, %% w unacked
|
|
persistent_bytes, %% w unacked
|
|
delta_transient_bytes, %%
|
|
|
|
target_ram_count,
|
|
ram_msg_count, %% w/o unacked
|
|
ram_msg_count_prev,
|
|
ram_ack_count_prev,
|
|
ram_bytes, %% w unacked
|
|
out_counter,
|
|
in_counter,
|
|
rates,
|
|
msgs_on_disk,
|
|
msg_indices_on_disk,
|
|
unconfirmed,
|
|
confirmed,
|
|
ack_out_counter,
|
|
ack_in_counter,
|
|
%% Unlike the other counters these two do not feed into
|
|
%% #rates{} and get reset
|
|
disk_read_count,
|
|
disk_write_count,
|
|
|
|
io_batch_size,
|
|
|
|
%% default queue (or lazy queue from 3.6 to 3.11)
|
|
mode,
|
|
version = 1,
|
|
%% Fast path for confirms handling. Instead of having
|
|
%% index/store keep track of confirms separately and
|
|
%% doing intersect/subtract/union we just put the messages
|
|
%% here and on sync move them to 'confirmed'.
|
|
%%
|
|
%% Note: This field used to be 'memory_reduction_run_count'.
|
|
unconfirmed_simple,
|
|
%% Queue data is grouped by VHost. We need to store it
|
|
%% to work with queue index.
|
|
virtual_host,
|
|
waiting_bump = false
|
|
}).
|
|
|
|
-record(rates, { in, out, ack_in, ack_out, timestamp }).
|
|
|
|
-record(msg_status,
|
|
{ seq_id,
|
|
msg_id,
|
|
msg,
|
|
is_persistent,
|
|
is_delivered,
|
|
msg_location, %% ?IN_SHARED_STORE | ?IN_QUEUE_STORE | ?IN_QUEUE_INDEX | ?IN_MEMORY
|
|
index_on_disk,
|
|
persist_to,
|
|
msg_props
|
|
}).
|
|
|
|
-record(delta,
|
|
{ start_seq_id, %% start_seq_id is inclusive
|
|
count,
|
|
transient,
|
|
end_seq_id %% end_seq_id is exclusive
|
|
}).
|
|
|
|
|
|
-include_lib("rabbit_common/include/rabbit.hrl").
|
|
-define(QUEUE, lqueue).
|
|
-define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>).
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
-type seq_id() :: non_neg_integer().
|
|
|
|
-type rates() :: #rates { in :: float(),
|
|
out :: float(),
|
|
ack_in :: float(),
|
|
ack_out :: float(),
|
|
timestamp :: rabbit_types:timestamp()}.
|
|
|
|
-type delta() :: #delta { start_seq_id :: non_neg_integer(),
|
|
count :: non_neg_integer(),
|
|
end_seq_id :: non_neg_integer() }.
|
|
|
|
%% The compiler (rightfully) complains that ack() and state() are
|
|
%% unused. For this reason we duplicate a -spec from
|
|
%% rabbit_backing_queue with the only intent being to remove
|
|
%% warnings. The problem here is that we can't parameterise the BQ
|
|
%% behaviour by these two types as we would like to. We still leave
|
|
%% these here for documentation purposes.
|
|
-type ack() :: seq_id().
|
|
-type state() :: #vqstate {
|
|
q1 :: ?QUEUE:?QUEUE(),
|
|
q2 :: ?QUEUE:?QUEUE(),
|
|
delta :: delta(),
|
|
q3 :: ?QUEUE:?QUEUE(),
|
|
q4 :: ?QUEUE:?QUEUE(),
|
|
next_seq_id :: seq_id(),
|
|
ram_pending_ack :: map(),
|
|
disk_pending_ack :: map(),
|
|
qi_pending_ack :: map(),
|
|
index_state :: any(),
|
|
msg_store_clients :: 'undefined' | {{any(), binary()},
|
|
{any(), binary()}},
|
|
durable :: boolean(),
|
|
transient_threshold :: non_neg_integer(),
|
|
qi_embed_msgs_below :: non_neg_integer(),
|
|
|
|
len :: non_neg_integer(),
|
|
bytes :: non_neg_integer(),
|
|
unacked_bytes :: non_neg_integer(),
|
|
|
|
persistent_count :: non_neg_integer(),
|
|
persistent_bytes :: non_neg_integer(),
|
|
|
|
target_ram_count :: non_neg_integer() | 'infinity',
|
|
ram_msg_count :: non_neg_integer(),
|
|
ram_msg_count_prev :: non_neg_integer(),
|
|
ram_ack_count_prev :: non_neg_integer(),
|
|
ram_bytes :: non_neg_integer(),
|
|
out_counter :: non_neg_integer(),
|
|
in_counter :: non_neg_integer(),
|
|
rates :: rates(),
|
|
msgs_on_disk :: gb_sets:set(),
|
|
msg_indices_on_disk :: gb_sets:set(),
|
|
unconfirmed :: gb_sets:set(),
|
|
confirmed :: gb_sets:set(),
|
|
ack_out_counter :: non_neg_integer(),
|
|
ack_in_counter :: non_neg_integer(),
|
|
disk_read_count :: non_neg_integer(),
|
|
disk_write_count :: non_neg_integer(),
|
|
|
|
io_batch_size :: pos_integer(),
|
|
mode :: 'default' | 'lazy',
|
|
virtual_host :: rabbit_types:vhost() }.
|
|
%% Duplicated from rabbit_backing_queue
|
|
-spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
|
|
|
|
%%----------------------------------------------------------------------------
|
|
%% Public API
|
|
%%----------------------------------------------------------------------------
|
|
|
|
start(VHost, DurableQueues) ->
|
|
rabbit_variable_queue:start(VHost, DurableQueues).
|
|
|
|
stop(VHost) ->
|
|
rabbit_variable_queue:stop(VHost).
|
|
|
|
init(Queue, Recover, Callback) ->
|
|
rabbit_variable_queue:init(Queue, Recover, Callback).
|
|
|
|
terminate(Reason, State) ->
|
|
rabbit_variable_queue:terminate(Reason, State).
|
|
|
|
delete_and_terminate(Reason, State) ->
|
|
rabbit_variable_queue:delete_and_terminate(Reason, State).
|
|
|
|
delete_crashed(Q) ->
|
|
rabbit_variable_queue:delete_crashed(Q).
|
|
|
|
purge(State = #vqstate { ram_pending_ack= QPA }) ->
|
|
maybe_delay(QPA),
|
|
rabbit_variable_queue:purge(State);
|
|
%% For v3.9.x and below because the state has changed.
|
|
purge(State) ->
|
|
QPA = element(10, State),
|
|
maybe_delay(QPA),
|
|
rabbit_variable_queue:purge(State).
|
|
|
|
purge_acks(State) ->
|
|
rabbit_variable_queue:purge_acks(State).
|
|
|
|
publish(Msg, MsgProps, IsDelivered, ChPid, State) ->
|
|
rabbit_variable_queue:publish(Msg, MsgProps, IsDelivered, ChPid, State).
|
|
|
|
publish_delivered(Msg, MsgProps, ChPid, State) ->
|
|
rabbit_variable_queue:publish_delivered(Msg, MsgProps, ChPid, State).
|
|
|
|
discard(_MsgId, _ChPid, State) -> State.
|
|
|
|
drain_confirmed(State) ->
|
|
rabbit_variable_queue:drain_confirmed(State).
|
|
|
|
dropwhile(Pred, State) ->
|
|
rabbit_variable_queue:dropwhile(Pred, State).
|
|
|
|
fetchwhile(Pred, Fun, Acc, State) ->
|
|
rabbit_variable_queue:fetchwhile(Pred, Fun, Acc, State).
|
|
|
|
fetch(AckRequired, State) ->
|
|
rabbit_variable_queue:fetch(AckRequired, State).
|
|
|
|
drop(AckRequired, State) ->
|
|
rabbit_variable_queue:drop(AckRequired, State).
|
|
|
|
ack(List, State) ->
|
|
rabbit_variable_queue:ack(List, State).
|
|
|
|
requeue(AckTags, #vqstate { ram_pending_ack = QPA } = State) ->
|
|
maybe_delay(QPA),
|
|
rabbit_variable_queue:requeue(AckTags, State);
|
|
%% For v3.9.x and below because the state has changed.
|
|
requeue(AckTags, State) ->
|
|
QPA = element(10, State),
|
|
maybe_delay(QPA),
|
|
rabbit_variable_queue:requeue(AckTags, State).
|
|
|
|
ackfold(MsgFun, Acc, State, AckTags) ->
|
|
rabbit_variable_queue:ackfold(MsgFun, Acc, State, AckTags).
|
|
|
|
fold(Fun, Acc, State) ->
|
|
rabbit_variable_queue:fold(Fun, Acc, State).
|
|
|
|
len(#vqstate { ram_pending_ack = QPA } = State) ->
|
|
maybe_delay(QPA),
|
|
rabbit_variable_queue:len(State);
|
|
%% For v3.9.x and below because the state has changed.
|
|
len(State) ->
|
|
QPA = element(10, State),
|
|
maybe_delay(QPA),
|
|
rabbit_variable_queue:len(State).
|
|
|
|
is_empty(State) -> 0 == len(State).
|
|
|
|
depth(State) ->
|
|
rabbit_variable_queue:depth(State).
|
|
|
|
update_rates(State) ->
|
|
rabbit_variable_queue:update_rates(State).
|
|
|
|
needs_timeout(State) ->
|
|
rabbit_variable_queue:needs_timeout(State).
|
|
|
|
timeout(State) ->
|
|
rabbit_variable_queue:timeout(State).
|
|
|
|
handle_pre_hibernate(State) ->
|
|
rabbit_variable_queue:handle_pre_hibernate(State).
|
|
|
|
handle_info(Msg, State) ->
|
|
rabbit_variable_queue:handle_info(Msg, State).
|
|
|
|
resume(State) -> rabbit_variable_queue:resume(State).
|
|
|
|
msg_rates(State) ->
|
|
rabbit_variable_queue:msg_rates(State).
|
|
|
|
info(Info, State) ->
|
|
rabbit_variable_queue:info(Info, State).
|
|
|
|
invoke(Module, Fun, State) -> rabbit_variable_queue:invoke(Module, Fun, State).
|
|
|
|
is_duplicate(Msg, State) -> rabbit_variable_queue:is_duplicate(Msg, State).
|
|
|
|
set_queue_mode(Mode, State) ->
|
|
rabbit_variable_queue:set_queue_mode(Mode, State).
|
|
|
|
set_queue_version(Version, State) ->
|
|
rabbit_variable_queue:set_queue_version(Version, State).
|
|
|
|
zip_msgs_and_acks(Msgs, AckTags, Accumulator, State) ->
|
|
rabbit_variable_queue:zip_msgs_and_acks(Msgs, AckTags, Accumulator, State).
|
|
|
|
%% Delay
|
|
maybe_delay(QPA) ->
|
|
%% The structure for ram_pending_acks has changed to maps in 3.12.
|
|
Values = case is_map(QPA) of
|
|
true -> maps:values(QPA);
|
|
false -> gb_trees:values(QPA)
|
|
end,
|
|
case is_timeout_test(Values) of
|
|
true -> receive
|
|
%% The queue received an EXIT message, it's probably the
|
|
%% node being stopped with "rabbitmqctl stop". Thus, abort
|
|
%% the wait and requeue the EXIT message.
|
|
{'EXIT', _, shutdown} = ExitMsg -> self() ! ExitMsg,
|
|
void
|
|
after infinity -> void
|
|
end;
|
|
_ -> void
|
|
end.
|
|
|
|
is_timeout_test([]) -> false;
|
|
is_timeout_test([#msg_status{
|
|
msg = #basic_message{
|
|
content = #content{
|
|
payload_fragments_rev = PFR}}}|Rem]) ->
|
|
case lists:member(?TIMEOUT_TEST_MSG, PFR) of
|
|
T = true -> T;
|
|
_ -> is_timeout_test(Rem)
|
|
end;
|
|
is_timeout_test([_|Rem]) -> is_timeout_test(Rem).
|