Add at-least once dead-lettering for quorum queues
and message TTL
This commit is contained in:
parent
e3ccefbf39
commit
1c17773c91
|
|
@ -778,6 +778,7 @@ declare_args() ->
|
|||
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
|
||||
{<<"x-dead-letter-exchange">>, fun check_dlxname_arg/2},
|
||||
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
|
||||
{<<"x-dead-letter-strategy">>, fun check_dlxstrategy_arg/2},
|
||||
{<<"x-max-length">>, fun check_non_neg_int_arg/2},
|
||||
{<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
|
||||
{<<"x-max-in-memory-length">>, fun check_non_neg_int_arg/2},
|
||||
|
|
@ -946,6 +947,22 @@ check_dlxrk_arg(Val, Args) when is_binary(Val) ->
|
|||
check_dlxrk_arg(_Val, _Args) ->
|
||||
{error, {unacceptable_type, "expected a string"}}.
|
||||
|
||||
-define(KNOWN_DLX_STRATEGIES, [<<"at-most-once">>, <<"at-least-once">>]).
|
||||
check_dlxstrategy_arg({longstr, Val}, _Args) ->
|
||||
case lists:member(Val, ?KNOWN_DLX_STRATEGIES) of
|
||||
true -> ok;
|
||||
false -> {error, invalid_dlx_strategy}
|
||||
end;
|
||||
check_dlxstrategy_arg({Type, _}, _Args) ->
|
||||
{error, {unacceptable_type, Type}};
|
||||
check_dlxstrategy_arg(Val, _Args) when is_binary(Val) ->
|
||||
case lists:member(Val, ?KNOWN_DLX_STRATEGIES) of
|
||||
true -> ok;
|
||||
false -> {error, invalid_dlx_strategy}
|
||||
end;
|
||||
check_dlxstrategy_arg(_Val, _Args) ->
|
||||
{error, invalid_dlx_strategy}.
|
||||
|
||||
-define(KNOWN_OVERFLOW_MODES, [<<"drop-head">>, <<"reject-publish">>, <<"reject-publish-dlx">>]).
|
||||
check_overflow({longstr, Val}, _Args) ->
|
||||
case lists:member(Val, ?KNOWN_OVERFLOW_MODES) of
|
||||
|
|
@ -1657,8 +1674,8 @@ credit(Q, CTag, Credit, Drain, QStates) ->
|
|||
{'ok', non_neg_integer(), qmsg(), rabbit_queue_type:state()} |
|
||||
{'empty', rabbit_queue_type:state()} |
|
||||
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
|
||||
basic_get(Q, NoAck, LimiterPid, CTag, QStates0) ->
|
||||
rabbit_queue_type:dequeue(Q, NoAck, LimiterPid, CTag, QStates0).
|
||||
basic_get(Q, NoAck, LimiterPid, CTag, QStates) ->
|
||||
rabbit_queue_type:dequeue(Q, NoAck, LimiterPid, CTag, QStates).
|
||||
|
||||
|
||||
-spec basic_consume(amqqueue:amqqueue(), boolean(), pid(), pid(), boolean(),
|
||||
|
|
@ -1670,7 +1687,7 @@ basic_get(Q, NoAck, LimiterPid, CTag, QStates0) ->
|
|||
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
|
||||
basic_consume(Q, NoAck, ChPid, LimiterPid,
|
||||
LimiterActive, ConsumerPrefetchCount, ConsumerTag,
|
||||
ExclusiveConsume, Args, OkMsg, ActingUser, Contexts) ->
|
||||
ExclusiveConsume, Args, OkMsg, ActingUser, QStates) ->
|
||||
|
||||
QName = amqqueue:get_name(Q),
|
||||
%% first phase argument validation
|
||||
|
|
@ -1686,7 +1703,7 @@ basic_consume(Q, NoAck, ChPid, LimiterPid,
|
|||
args => Args,
|
||||
ok_msg => OkMsg,
|
||||
acting_user => ActingUser},
|
||||
rabbit_queue_type:consume(Q, Spec, Contexts).
|
||||
rabbit_queue_type:consume(Q, Spec, QStates).
|
||||
|
||||
-spec basic_cancel(amqqueue:amqqueue(), rabbit_types:ctag(), any(),
|
||||
rabbit_types:username(),
|
||||
|
|
|
|||
|
|
@ -12,7 +12,8 @@
|
|||
-export([publish/4, publish/5, publish/1,
|
||||
message/3, message/4, properties/1, prepend_table_header/3,
|
||||
extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
|
||||
header_routes/1, parse_expiration/1, header/2, header/3]).
|
||||
header_routes/1, parse_expiration/1, header/2, header/3,
|
||||
is_message_persistent/1]).
|
||||
-export([build_content/2, from_content/1, msg_size/1,
|
||||
maybe_gc_large_msg/1, maybe_gc_large_msg/2]).
|
||||
-export([add_header/4,
|
||||
|
|
|
|||
|
|
@ -445,8 +445,10 @@ recover_durable_queues(QueuesAndRecoveryTerms) ->
|
|||
|
||||
capabilities() ->
|
||||
#{unsupported_policies => [ %% Stream policies
|
||||
<<"max-age">>, <<"stream-max-segment-size-bytes">>,
|
||||
<<"queue-leader-locator">>, <<"initial-cluster-size">>],
|
||||
<<"max-age">>, <<"stream-max-segment-size-bytes">>,
|
||||
<<"queue-leader-locator">>, <<"initial-cluster-size">>,
|
||||
%% Quorum policies
|
||||
<<"dead-letter-strategy">>],
|
||||
queue_arguments => [<<"x-expires">>, <<"x-message-ttl">>, <<"x-dead-letter-exchange">>,
|
||||
<<"x-dead-letter-routing-key">>, <<"x-max-length">>,
|
||||
<<"x-max-length-bytes">>, <<"x-max-in-memory-length">>,
|
||||
|
|
|
|||
|
|
@ -7,7 +7,9 @@
|
|||
|
||||
-module(rabbit_dead_letter).
|
||||
|
||||
-export([publish/5]).
|
||||
-export([publish/5,
|
||||
make_msg/5,
|
||||
detect_cycles/3]).
|
||||
|
||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||
-include_lib("rabbit_common/include/rabbit_framing.hrl").
|
||||
|
|
@ -39,7 +41,7 @@ make_msg(Msg = #basic_message{content = Content,
|
|||
undefined -> {RoutingKeys, fun (H) -> H end};
|
||||
_ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
|
||||
end,
|
||||
ReasonBin = list_to_binary(atom_to_list(Reason)),
|
||||
ReasonBin = atom_to_binary(Reason),
|
||||
TimeSec = os:system_time(seconds),
|
||||
PerMsgTTL = per_msg_ttl_header(Content#content.properties),
|
||||
HeadersFun2 =
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load Diff
|
|
@ -2,16 +2,21 @@
|
|||
%% macros for memory optimised tuple structures
|
||||
-define(TUPLE(A, B), [A | B]).
|
||||
|
||||
-define(DISK_MSG_TAG, '$disk').
|
||||
% -define(PREFIX_DISK_MSG_TAG, '$prefix_disk').
|
||||
-define(PREFIX_MEM_MSG_TAG, '$prefix_inmem').
|
||||
%% We want short atoms since their binary representations will get
|
||||
%% persisted in a snapshot for every message.
|
||||
%% '$d' stand for 'disk'.
|
||||
-define(DISK_MSG_TAG, '$d').
|
||||
%% '$m' stand for 'memory'.
|
||||
-define(PREFIX_MEM_MSG_TAG, '$m').
|
||||
|
||||
-define(DISK_MSG(Header), [Header | ?DISK_MSG_TAG]).
|
||||
-define(MSG(Header, RawMsg), [Header | RawMsg]).
|
||||
-define(INDEX_MSG(Index, Msg), [Index | Msg]).
|
||||
-define(PREFIX_MEM_MSG(Header), [Header | ?PREFIX_MEM_MSG_TAG]).
|
||||
|
||||
% -define(PREFIX_DISK_MSG_TAG, '$prefix_disk').
|
||||
% -define(PREFIX_DISK_MSG(Header), [?PREFIX_DISK_MSG_TAG | Header]).
|
||||
% -define(PREFIX_DISK_MSG(Header), ?DISK_MSG(Header)).
|
||||
-define(PREFIX_MEM_MSG(Header), [?PREFIX_MEM_MSG_TAG | Header]).
|
||||
|
||||
-type option(T) :: undefined | T.
|
||||
|
||||
|
|
@ -32,11 +37,14 @@
|
|||
%% same process
|
||||
|
||||
-type msg_header() :: msg_size() |
|
||||
#{size := msg_size(),
|
||||
delivery_count => non_neg_integer()}.
|
||||
#{size := msg_size(),
|
||||
delivery_count => non_neg_integer(),
|
||||
expiry => milliseconds()}.
|
||||
%% The message header:
|
||||
%% delivery_count: the number of unsuccessful delivery attempts.
|
||||
%% A non-zero value indicates a previous attempt.
|
||||
%% expiry: Epoch time in ms when a message expires. Set during enqueue.
|
||||
%% Value is determined by per-queue or per-message message TTL.
|
||||
%% If it only contains the size it can be condensed to an integer only
|
||||
|
||||
-type msg() :: ?MSG(msg_header(), raw_msg()) |
|
||||
|
|
@ -122,7 +130,7 @@
|
|||
-record(enqueuer,
|
||||
{next_seqno = 1 :: msg_seqno(),
|
||||
% out of order enqueues - sorted list
|
||||
pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}],
|
||||
pending = [] :: [{msg_seqno(), ra:index(), milliseconds(), raw_msg()}],
|
||||
status = up :: up |
|
||||
suspected_down,
|
||||
%% it is useful to have a record of when this was blocked
|
||||
|
|
@ -137,7 +145,7 @@
|
|||
{name :: atom(),
|
||||
resource :: rabbit_types:r('queue'),
|
||||
release_cursor_interval :: option({non_neg_integer(), non_neg_integer()}),
|
||||
dead_letter_handler :: option(applied_mfa()),
|
||||
dead_letter_handler :: option({at_most_once, applied_mfa()} | at_least_once),
|
||||
become_leader_handler :: option(applied_mfa()),
|
||||
overflow_strategy = drop_head :: drop_head | reject_publish,
|
||||
max_length :: option(non_neg_integer()),
|
||||
|
|
@ -149,6 +157,7 @@
|
|||
max_in_memory_length :: option(non_neg_integer()),
|
||||
max_in_memory_bytes :: option(non_neg_integer()),
|
||||
expires :: undefined | milliseconds(),
|
||||
msg_ttl :: undefined | milliseconds(),
|
||||
unused_1,
|
||||
unused_2
|
||||
}).
|
||||
|
|
@ -166,6 +175,7 @@
|
|||
% queue of returned msg_in_ids - when checking out it picks from
|
||||
returns = lqueue:new() :: lqueue:lqueue(term()),
|
||||
% a counter of enqueues - used to trigger shadow copy points
|
||||
% reset to 0 when release_cursor gets stored
|
||||
enqueue_count = 0 :: non_neg_integer(),
|
||||
% a map containing all the live processes that have ever enqueued
|
||||
% a message to this queue as well as a cached value of the smallest
|
||||
|
|
@ -177,11 +187,19 @@
|
|||
% index when there are large gaps but should be faster than gb_trees
|
||||
% for normal appending operations as it's backed by a map
|
||||
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
|
||||
%% A release cursor is essentially a snapshot without message bodies
|
||||
%% (aka. "dehydrated state") taken at time T in order to truncate
|
||||
%% the log at some point in the future when all messages that were enqueued
|
||||
%% up to time T have been removed (e.g. consumed, dead-lettered, or dropped).
|
||||
%% This concept enables snapshots to not contain any message bodies.
|
||||
%% Advantage: Smaller snapshots are sent between Ra nodes.
|
||||
%% Working assumption: Messages are consumed in a FIFO-ish order because
|
||||
%% the log is truncated only until the oldest message.
|
||||
release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor,
|
||||
ra:index(), #rabbit_fifo{}}),
|
||||
% consumers need to reflect consumer state at time of snapshot
|
||||
% needs to be part of snapshot
|
||||
consumers = #{} :: #{consumer_id() => #consumer{}},
|
||||
consumers = #{} :: #{consumer_id() => consumer()},
|
||||
% consumers that require further service are queued here
|
||||
% needs to be part of snapshot
|
||||
service_queue = priority_queue:new() :: priority_queue:q(),
|
||||
|
|
@ -194,7 +212,10 @@
|
|||
%% overflow calculations).
|
||||
%% This is done so that consumers are still served in a deterministic
|
||||
%% order on recovery.
|
||||
%% TODO Remove this field and store prefix messages in-place. This will
|
||||
%% simplify the checkout logic.
|
||||
prefix_msgs = {0, [], 0, []} :: prefix_msgs(),
|
||||
dlx = rabbit_fifo_dlx:init() :: rabbit_fifo_dlx:state(),
|
||||
msg_bytes_enqueue = 0 :: non_neg_integer(),
|
||||
msg_bytes_checkout = 0 :: non_neg_integer(),
|
||||
%% waiting consumers, one is picked active consumer is cancelled or dies
|
||||
|
|
@ -209,7 +230,7 @@
|
|||
|
||||
-type config() :: #{name := atom(),
|
||||
queue_resource := rabbit_types:r('queue'),
|
||||
dead_letter_handler => applied_mfa(),
|
||||
dead_letter_handler => option({at_most_once, applied_mfa()} | at_least_once),
|
||||
become_leader_handler => applied_mfa(),
|
||||
release_cursor_interval => non_neg_integer(),
|
||||
max_length => non_neg_integer(),
|
||||
|
|
@ -220,5 +241,6 @@
|
|||
single_active_consumer_on => boolean(),
|
||||
delivery_limit => non_neg_integer(),
|
||||
expires => non_neg_integer(),
|
||||
msg_ttl => non_neg_integer(),
|
||||
created => non_neg_integer()
|
||||
}.
|
||||
|
|
|
|||
|
|
@ -531,7 +531,7 @@ update_machine_state(Server, Conf) ->
|
|||
%% `{internal, AppliedCorrelations, State}' if the event contained an internally
|
||||
%% handled event such as a notification and a correlation was included with
|
||||
%% the command (e.g. in a call to `enqueue/3' the correlation terms are returned
|
||||
%% here.
|
||||
%% here).
|
||||
%%
|
||||
%% `{RaFifoEvent, State}' if the event contained a client message generated by
|
||||
%% the `rabbit_fifo' state machine such as a delivery.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,324 @@
|
|||
-module(rabbit_fifo_dlx).
|
||||
|
||||
-include("rabbit_fifo_dlx.hrl").
|
||||
-include("rabbit_fifo.hrl").
|
||||
|
||||
% client API, e.g. for rabbit_fifo_dlx_client
|
||||
-export([make_checkout/2,
|
||||
make_settle/1]).
|
||||
|
||||
% called by rabbit_fifo delegating DLX handling to this module
|
||||
-export([init/0, apply/2, discard/3, overview/1,
|
||||
checkout/1, state_enter/4,
|
||||
start_worker/2, terminate_worker/1, cleanup/1, purge/1,
|
||||
consumer_pid/1, dehydrate/1, normalize/1,
|
||||
stat/1]).
|
||||
|
||||
%% This module handles the dead letter (DLX) part of the rabbit_fifo state machine.
|
||||
%% This is a separate module to better unit test and provide separation of concerns.
|
||||
%% This module maintains its own state:
|
||||
%% a queue of DLX messages, a single node local DLX consumer, and some stats.
|
||||
%% The state of this module is included into rabbit_fifo state because there can only by one Ra state machine.
|
||||
%% The rabbit_fifo module forwards all DLX commands to this module where we then update the DLX specific state only:
|
||||
%% e.g. DLX consumer subscribed, adding / removing discarded messages, stats
|
||||
%%
|
||||
%% It also runs its own checkout logic sending DLX messages to the DLX consumer.
|
||||
|
||||
-record(checkout,{
|
||||
consumer :: atom(),
|
||||
prefetch :: non_neg_integer()
|
||||
}).
|
||||
-record(settle, {msg_ids :: [msg_id()]}).
|
||||
-opaque protocol() :: {dlx, #checkout{} | #settle{}}.
|
||||
-opaque state() :: #?MODULE{}.
|
||||
-export_type([state/0, protocol/0, reason/0]).
|
||||
|
||||
init() ->
|
||||
#?MODULE{}.
|
||||
|
||||
make_checkout(RegName, NumUnsettled) ->
|
||||
{dlx, #checkout{consumer = RegName,
|
||||
prefetch = NumUnsettled
|
||||
}}.
|
||||
|
||||
make_settle(MessageIds) when is_list(MessageIds) ->
|
||||
{dlx, #settle{msg_ids = MessageIds}}.
|
||||
|
||||
overview(#?MODULE{consumer = undefined,
|
||||
msg_bytes = MsgBytes,
|
||||
msg_bytes_checkout = 0,
|
||||
discards = Discards}) ->
|
||||
overview0(Discards, #{}, MsgBytes, 0);
|
||||
overview(#?MODULE{consumer = #dlx_consumer{checked_out = Checked},
|
||||
msg_bytes = MsgBytes,
|
||||
msg_bytes_checkout = MsgBytesCheckout,
|
||||
discards = Discards}) ->
|
||||
overview0(Discards, Checked, MsgBytes, MsgBytesCheckout).
|
||||
|
||||
overview0(Discards, Checked, MsgBytes, MsgBytesCheckout) ->
|
||||
#{num_discarded => lqueue:len(Discards),
|
||||
num_discard_checked_out => map_size(Checked),
|
||||
discard_message_bytes => MsgBytes,
|
||||
discard_checkout_message_bytes => MsgBytesCheckout}.
|
||||
|
||||
stat(#?MODULE{consumer = Con,
|
||||
discards = Discards,
|
||||
msg_bytes = MsgBytes,
|
||||
msg_bytes_checkout = MsgBytesCheckout}) ->
|
||||
Num0 = lqueue:len(Discards),
|
||||
Num = case Con of
|
||||
undefined ->
|
||||
Num0;
|
||||
#dlx_consumer{checked_out = Checked} ->
|
||||
Num0 + map_size(Checked)
|
||||
end,
|
||||
Bytes = MsgBytes + MsgBytesCheckout,
|
||||
{Num, Bytes}.
|
||||
|
||||
apply(#checkout{consumer = RegName,
|
||||
prefetch = Prefetch},
|
||||
#?MODULE{consumer = undefined} = State0) ->
|
||||
State = State0#?MODULE{consumer = #dlx_consumer{registered_name = RegName,
|
||||
prefetch = Prefetch}},
|
||||
{State, ok};
|
||||
apply(#checkout{consumer = RegName,
|
||||
prefetch = Prefetch},
|
||||
#?MODULE{consumer = #dlx_consumer{checked_out = CheckedOutOldConsumer},
|
||||
discards = Discards0,
|
||||
msg_bytes = Bytes,
|
||||
msg_bytes_checkout = BytesCheckout} = State0) ->
|
||||
%% Since we allow only a single consumer, the new consumer replaces the old consumer.
|
||||
%% All checked out messages to the old consumer need to be returned to the discards queue
|
||||
%% such that these messages can be (eventually) re-delivered to the new consumer.
|
||||
%% When inserting back into the discards queue, we respect the original order in which messages
|
||||
%% were discarded.
|
||||
Checked0 = maps:to_list(CheckedOutOldConsumer),
|
||||
Checked1 = lists:keysort(1, Checked0),
|
||||
{Discards, BytesMoved} = lists:foldr(fun({_Id, {_Reason, IdxMsg} = Msg}, {D, B}) ->
|
||||
{lqueue:in_r(Msg, D), B + size_in_bytes(IdxMsg)}
|
||||
end, {Discards0, 0}, Checked1),
|
||||
State = State0#?MODULE{consumer = #dlx_consumer{registered_name = RegName,
|
||||
prefetch = Prefetch},
|
||||
discards = Discards,
|
||||
msg_bytes = Bytes + BytesMoved,
|
||||
msg_bytes_checkout = BytesCheckout - BytesMoved},
|
||||
{State, ok};
|
||||
apply(#settle{msg_ids = MsgIds},
|
||||
#?MODULE{consumer = #dlx_consumer{checked_out = Checked} = C,
|
||||
msg_bytes_checkout = BytesCheckout} = State0) ->
|
||||
Acked = maps:with(MsgIds, Checked),
|
||||
AckedRsnMsgs = maps:values(Acked),
|
||||
AckedMsgs = lists:map(fun({_Reason, Msg}) -> Msg end, AckedRsnMsgs),
|
||||
AckedBytes = lists:foldl(fun(Msg, Bytes) ->
|
||||
Bytes + size_in_bytes(Msg)
|
||||
end, 0, AckedMsgs),
|
||||
Unacked = maps:without(MsgIds, Checked),
|
||||
State = State0#?MODULE{consumer = C#dlx_consumer{checked_out = Unacked},
|
||||
msg_bytes_checkout = BytesCheckout - AckedBytes},
|
||||
{State, AckedMsgs}.
|
||||
|
||||
%%TODO delete delivery_count header to save space?
|
||||
%% It's not needed anymore.
|
||||
discard(Msg, Reason, #?MODULE{discards = Discards0,
|
||||
msg_bytes = MsgBytes0} = State) ->
|
||||
Discards = lqueue:in({Reason, Msg}, Discards0),
|
||||
MsgBytes = MsgBytes0 + size_in_bytes(Msg),
|
||||
State#?MODULE{discards = Discards,
|
||||
msg_bytes = MsgBytes}.
|
||||
|
||||
checkout(#?MODULE{consumer = undefined,
|
||||
discards = Discards} = State) ->
|
||||
case lqueue:is_empty(Discards) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
rabbit_log:warning("there are dead-letter messages but no dead-letter consumer")
|
||||
end,
|
||||
{State, []};
|
||||
checkout(State) ->
|
||||
checkout0(checkout_one(State), {[],[]}).
|
||||
|
||||
checkout0({success, MsgId, {Reason, ?INDEX_MSG(RaftIdx, ?DISK_MSG(Header))}, State}, {InMemMsgs, LogMsgs}) when is_integer(RaftIdx) ->
|
||||
DelMsg = {RaftIdx, {Reason, MsgId, Header}},
|
||||
SendAcc = {InMemMsgs, [DelMsg|LogMsgs]},
|
||||
checkout0(checkout_one(State ), SendAcc);
|
||||
checkout0({success, MsgId, {Reason, ?INDEX_MSG(Idx, ?MSG(Header, Msg))}, State}, {InMemMsgs, LogMsgs}) when is_integer(Idx) ->
|
||||
DelMsg = {MsgId, {Reason, Header, Msg}},
|
||||
SendAcc = {[DelMsg|InMemMsgs], LogMsgs},
|
||||
checkout0(checkout_one(State), SendAcc);
|
||||
checkout0({success, _MsgId, {_Reason, ?TUPLE(_, _)}, State}, SendAcc) ->
|
||||
%% This is a prefix message which means we are recovering from a snapshot.
|
||||
%% We know:
|
||||
%% 1. This message was already delivered in the past, and
|
||||
%% 2. The recovery Raft log ahead of this Raft command will defintely settle this message.
|
||||
%% Therefore, here, we just check this message out to the consumer but do not re-deliver this message
|
||||
%% so that we will end up with the correct and deterministic state once the whole recovery log replay is completed.
|
||||
checkout0(checkout_one(State), SendAcc);
|
||||
checkout0(#?MODULE{consumer = #dlx_consumer{registered_name = RegName}} = State, SendAcc) ->
|
||||
Effects = delivery_effects(whereis(RegName), SendAcc),
|
||||
{State, Effects}.
|
||||
|
||||
checkout_one(#?MODULE{consumer = #dlx_consumer{checked_out = Checked,
|
||||
prefetch = Prefetch}} = State) when map_size(Checked) >= Prefetch ->
|
||||
State;
|
||||
checkout_one(#?MODULE{consumer = #dlx_consumer{checked_out = Checked0,
|
||||
next_msg_id = Next} = Con0} = State0) ->
|
||||
case take_next_msg(State0) of
|
||||
{{_, Msg} = ReasonMsg, State1} ->
|
||||
Checked = maps:put(Next, ReasonMsg, Checked0),
|
||||
State2 = State1#?MODULE{consumer = Con0#dlx_consumer{checked_out = Checked,
|
||||
next_msg_id = Next + 1}},
|
||||
Bytes = size_in_bytes(Msg),
|
||||
State = add_bytes_checkout(Bytes, State2),
|
||||
{success, Next, ReasonMsg, State};
|
||||
empty ->
|
||||
State0
|
||||
end.
|
||||
|
||||
take_next_msg(#?MODULE{discards = Discards0} = State) ->
|
||||
case lqueue:out(Discards0) of
|
||||
{empty, _} ->
|
||||
empty;
|
||||
{{value, ReasonMsg}, Discards} ->
|
||||
{ReasonMsg, State#?MODULE{discards = Discards}}
|
||||
end.
|
||||
|
||||
add_bytes_checkout(Size, #?MODULE{msg_bytes = Bytes,
|
||||
msg_bytes_checkout = BytesCheckout} = State) ->
|
||||
State#?MODULE{msg_bytes = Bytes - Size,
|
||||
msg_bytes_checkout = BytesCheckout + Size}.
|
||||
|
||||
size_in_bytes(Msg) ->
|
||||
Header = rabbit_fifo:get_msg_header(Msg),
|
||||
rabbit_fifo:get_header(size, Header).
|
||||
|
||||
%% returns at most one delivery effect because there is only one consumer
|
||||
delivery_effects(_CPid, {[], []}) ->
|
||||
[];
|
||||
delivery_effects(CPid, {InMemMsgs, []}) ->
|
||||
[{send_msg, CPid, {dlx_delivery, lists:reverse(InMemMsgs)}, [ra_event]}];
|
||||
delivery_effects(CPid, {InMemMsgs, IdxMsgs0}) ->
|
||||
IdxMsgs = lists:reverse(IdxMsgs0),
|
||||
{RaftIdxs, Data} = lists:unzip(IdxMsgs),
|
||||
[{log, RaftIdxs,
|
||||
fun(Log) ->
|
||||
Msgs0 = lists:zipwith(fun ({enqueue, _, _, Msg}, {Reason, MsgId, Header}) ->
|
||||
{MsgId, {Reason, Header, Msg}}
|
||||
end, Log, Data),
|
||||
Msgs = case InMemMsgs of
|
||||
[] ->
|
||||
Msgs0;
|
||||
_ ->
|
||||
lists:sort(InMemMsgs ++ Msgs0)
|
||||
end,
|
||||
[{send_msg, CPid, {dlx_delivery, Msgs}, [ra_event]}]
|
||||
end}].
|
||||
|
||||
state_enter(leader, QRef, QName, _State) ->
|
||||
start_worker(QRef, QName);
|
||||
state_enter(_, _, _, State) ->
|
||||
terminate_worker(State).
|
||||
|
||||
start_worker(QRef, QName) ->
|
||||
RegName = registered_name(QName),
|
||||
%% We must ensure that starting the rabbit_fifo_dlx_worker succeeds.
|
||||
%% Therefore, we don't use an effect.
|
||||
%% Also therefore, if starting the rabbit_fifo_dlx_worker fails, let the whole Ra server process crash
|
||||
%% in which case another Ra node will become leader.
|
||||
%% supervisor:start_child/2 blocks until rabbit_fifo_dlx_worker:init/1 returns (TODO check if this is correct).
|
||||
%% That's okay since rabbit_fifo_dlx_worker:init/1 returns immediately by delegating
|
||||
%% initial setup to handle_continue/2.
|
||||
case whereis(RegName) of
|
||||
undefined ->
|
||||
{ok, Pid} = supervisor:start_child(rabbit_fifo_dlx_sup, [QRef, RegName]),
|
||||
rabbit_log:debug("started rabbit_fifo_dlx_worker (~s ~p)", [RegName, Pid]);
|
||||
Pid ->
|
||||
rabbit_log:debug("rabbit_fifo_dlx_worker (~s ~p) already started", [RegName, Pid])
|
||||
end.
|
||||
|
||||
terminate_worker(#?MODULE{consumer = #dlx_consumer{registered_name = RegName}}) ->
|
||||
case whereis(RegName) of
|
||||
undefined ->
|
||||
ok;
|
||||
Pid ->
|
||||
%% Note that we can't return a mod_call effect here because mod_call is executed on the leader only.
|
||||
ok = supervisor:terminate_child(rabbit_fifo_dlx_sup, Pid),
|
||||
rabbit_log:debug("terminated rabbit_fifo_dlx_worker (~s ~p)", [RegName, Pid])
|
||||
end;
|
||||
terminate_worker(_) ->
|
||||
ok.
|
||||
|
||||
%% TODO consider not registering the worker name at all
|
||||
%% because if there is a new worker process, it will always subscribe and tell us its new pid
|
||||
registered_name(QName) when is_atom(QName) ->
|
||||
list_to_atom(atom_to_list(QName) ++ "_dlx").
|
||||
|
||||
consumer_pid(#?MODULE{consumer = #dlx_consumer{registered_name = Name}}) ->
|
||||
whereis(Name);
|
||||
consumer_pid(_) ->
|
||||
undefined.
|
||||
|
||||
%% called when switching from at-least-once to at-most-once
|
||||
cleanup(#?MODULE{consumer = Consumer,
|
||||
discards = Discards} = State) ->
|
||||
terminate_worker(State),
|
||||
%% Return messages in the order they got discarded originally
|
||||
%% for the final at-most-once dead-lettering.
|
||||
CheckedReasonMsgs = case Consumer of
|
||||
#dlx_consumer{checked_out = Checked} when is_map(Checked) ->
|
||||
L0 = maps:to_list(Checked),
|
||||
L1 = lists:keysort(1, L0),
|
||||
{_, L2} = lists:unzip(L1),
|
||||
L2;
|
||||
_ ->
|
||||
[]
|
||||
end,
|
||||
DiscardReasonMsgs = lqueue:to_list(Discards),
|
||||
CheckedReasonMsgs ++ DiscardReasonMsgs.
|
||||
|
||||
purge(#?MODULE{consumer = Con0,
|
||||
discards = Discards} = State0) ->
|
||||
{Con, CheckedMsgs} = case Con0 of
|
||||
#dlx_consumer{checked_out = Checked} when is_map(Checked) ->
|
||||
L = maps:to_list(Checked),
|
||||
{_, CheckedReasonMsgs} = lists:unzip(L),
|
||||
{_, Msgs} = lists:unzip(CheckedReasonMsgs),
|
||||
C = Con0#dlx_consumer{checked_out = #{}},
|
||||
{C, Msgs};
|
||||
_ ->
|
||||
{Con0, []}
|
||||
end,
|
||||
DiscardReasonMsgs = lqueue:to_list(Discards),
|
||||
{_, DiscardMsgs} = lists:unzip(DiscardReasonMsgs),
|
||||
PurgedMsgs = CheckedMsgs ++ DiscardMsgs,
|
||||
State = State0#?MODULE{consumer = Con,
|
||||
discards = lqueue:new(),
|
||||
msg_bytes = 0,
|
||||
msg_bytes_checkout = 0
|
||||
},
|
||||
{State, PurgedMsgs}.
|
||||
|
||||
%% TODO Consider alternative to not dehydrate at all
|
||||
%% by putting messages to disk before enqueueing them in discards queue.
|
||||
dehydrate(#?MODULE{discards = Discards,
|
||||
consumer = Con} = State) ->
|
||||
State#?MODULE{discards = dehydrate_messages(Discards),
|
||||
consumer = dehydrate_consumer(Con)}.
|
||||
|
||||
dehydrate_messages(Discards) ->
|
||||
L0 = lqueue:to_list(Discards),
|
||||
L1 = lists:map(fun({_Reason, Msg}) ->
|
||||
{?NIL, rabbit_fifo:dehydrate_message(Msg)}
|
||||
end, L0),
|
||||
lqueue:from_list(L1).
|
||||
|
||||
dehydrate_consumer(#dlx_consumer{checked_out = Checked0} = Con) ->
|
||||
Checked = maps:map(fun (_, {_, Msg}) ->
|
||||
{?NIL, rabbit_fifo:dehydrate_message(Msg)}
|
||||
end, Checked0),
|
||||
Con#dlx_consumer{checked_out = Checked};
|
||||
dehydrate_consumer(undefined) ->
|
||||
undefined.
|
||||
|
||||
normalize(#?MODULE{discards = Discards} = State) ->
|
||||
State#?MODULE{discards = lqueue:from_list(lqueue:to_list(Discards))}.
|
||||
|
|
@ -0,0 +1,30 @@
|
|||
-define(NIL, []).
|
||||
|
||||
%% At-least-once dead-lettering does not support reason 'maxlen'.
|
||||
%% Reason of prefix messages is [] because the message will not be
|
||||
%% actually delivered and storing 2 bytes in the persisted snapshot
|
||||
%% is less than the reason atom.
|
||||
-type reason() :: 'expired' | 'rejected' | delivery_limit | ?NIL.
|
||||
|
||||
% See snapshot scenarios in rabbit_fifo_prop_SUITE. Add dlx dehydrate tests.
|
||||
-record(dlx_consumer,{
|
||||
%% We don't require a consumer tag because a consumer tag is a means to distinguish
|
||||
%% multiple consumers in the same channel. The rabbit_fifo_dlx_worker channel like process however
|
||||
%% creates only a single consumer to this quorum queue's discards queue.
|
||||
registered_name :: atom(),
|
||||
prefetch :: non_neg_integer(),
|
||||
checked_out = #{} :: #{msg_id() => {reason(), indexed_msg()}},
|
||||
next_msg_id = 0 :: msg_id() % part of snapshot data
|
||||
% total number of checked out messages - ever
|
||||
% incremented for each delivery
|
||||
% delivery_count = 0 :: non_neg_integer(),
|
||||
% status = up :: up | suspected_down | cancelled
|
||||
}).
|
||||
|
||||
-record(rabbit_fifo_dlx,{
|
||||
consumer = undefined :: #dlx_consumer{} | undefined,
|
||||
%% Queue of dead-lettered messages.
|
||||
discards = lqueue:new() :: lqueue:lqueue({reason(), indexed_msg()}),
|
||||
msg_bytes = 0 :: non_neg_integer(),
|
||||
msg_bytes_checkout = 0 :: non_neg_integer()
|
||||
}).
|
||||
|
|
@ -0,0 +1,93 @@
|
|||
-module(rabbit_fifo_dlx_client).
|
||||
|
||||
-export([checkout/4, settle/2, handle_ra_event/3,
|
||||
overview/1]).
|
||||
|
||||
-record(state,{
|
||||
queue_resource :: rabbit_tyes:r(queue),
|
||||
leader :: ra:server_id(),
|
||||
last_msg_id :: non_neg_integer | -1
|
||||
}).
|
||||
-opaque state() :: #state{}.
|
||||
-export_type([state/0]).
|
||||
|
||||
checkout(RegName, QResource, Leader, NumUnsettled) ->
|
||||
Cmd = rabbit_fifo_dlx:make_checkout(RegName, NumUnsettled),
|
||||
State = #state{queue_resource = QResource,
|
||||
leader = Leader,
|
||||
last_msg_id = -1},
|
||||
process_command(Cmd, State, 5).
|
||||
|
||||
settle(MsgIds, State) when is_list(MsgIds) ->
|
||||
Cmd = rabbit_fifo_dlx:make_settle(MsgIds),
|
||||
%%TODO use pipeline_command without correlation ID, i.e. without notification
|
||||
process_command(Cmd, State, 2).
|
||||
|
||||
process_command(_Cmd, _State, 0) ->
|
||||
{error, ra_command_failed};
|
||||
process_command(Cmd, #state{leader = Leader} = State, Tries) ->
|
||||
case ra:process_command(Leader, Cmd, 60_000) of
|
||||
{ok, ok, Leader} ->
|
||||
{ok, State#state{leader = Leader}};
|
||||
{ok, ok, L} ->
|
||||
rabbit_log:warning("Failed to process command ~p on quorum queue leader ~p because actual leader is ~p.",
|
||||
[Cmd, Leader, L]),
|
||||
{error, ra_command_failed};
|
||||
Err ->
|
||||
rabbit_log:warning("Failed to process command ~p on quorum queue leader ~p: ~p~n"
|
||||
"Trying ~b more time(s)...",
|
||||
[Cmd, Leader, Err, Tries]),
|
||||
process_command(Cmd, State, Tries - 1)
|
||||
end.
|
||||
|
||||
handle_ra_event(Leader, {machine, {dlx_delivery, _} = Del}, #state{leader = Leader} = State) ->
|
||||
handle_delivery(Del, State);
|
||||
handle_ra_event(_From, Evt, State) ->
|
||||
rabbit_log:warning("~s received unknown ra event: ~p", [?MODULE, Evt]),
|
||||
{ok, State, []}.
|
||||
|
||||
handle_delivery({dlx_delivery, [{FstId, _} | _] = IdMsgs},
|
||||
#state{queue_resource = QRes,
|
||||
last_msg_id = Prev} = State0) ->
|
||||
%% format as a deliver action
|
||||
{LastId, _} = lists:last(IdMsgs),
|
||||
Del = {deliver, transform_msgs(QRes, IdMsgs)},
|
||||
case Prev of
|
||||
Prev when FstId =:= Prev+1 ->
|
||||
%% expected message ID(s) got delivered
|
||||
State = State0#state{last_msg_id = LastId},
|
||||
{ok, State, [Del]};
|
||||
Prev when FstId > Prev+1 ->
|
||||
%% messages ID(s) are missing, therefore fetch all checked-out discarded messages
|
||||
%% TODO implement as done in
|
||||
%% https://github.com/rabbitmq/rabbitmq-server/blob/b4eb5e2cfd7f85a1681617dc489dd347fa9aac72/deps/rabbit/src/rabbit_fifo_client.erl#L732-L744
|
||||
%% A: not needed because of local guarantees, let it crash
|
||||
exit(not_implemented);
|
||||
Prev when FstId =< Prev ->
|
||||
rabbit_log:debug("dropping messages with duplicate IDs (~b to ~b) consumed from ~s",
|
||||
[FstId, Prev, rabbit_misc:rs(QRes)]),
|
||||
case lists:dropwhile(fun({Id, _}) -> Id =< Prev end, IdMsgs) of
|
||||
[] ->
|
||||
{ok, State0, []};
|
||||
IdMsgs2 ->
|
||||
handle_delivery({dlx_delivery, IdMsgs2}, State0)
|
||||
end;
|
||||
_ when FstId =:= 0 ->
|
||||
% the very first delivery
|
||||
% TODO We init last_msg_id with -1. So, why would we ever run into this branch?
|
||||
% A: can be a leftover
|
||||
rabbit_log:debug("very first delivery consumed from ~s", [rabbit_misc:rs(QRes)]),
|
||||
State = State0#state{last_msg_id = 0},
|
||||
{ok, State, [Del]}
|
||||
end.
|
||||
|
||||
transform_msgs(QRes, Msgs) ->
|
||||
lists:map(
|
||||
fun({MsgId, {Reason, _MsgHeader, Msg}}) ->
|
||||
{QRes, MsgId, Msg, Reason}
|
||||
end, Msgs).
|
||||
|
||||
overview(#state{leader = Leader,
|
||||
last_msg_id = LastMsgId}) ->
|
||||
#{leader => Leader,
|
||||
last_msg_id => LastMsgId}.
|
||||
|
|
@ -0,0 +1,37 @@
|
|||
-module(rabbit_fifo_dlx_sup).
|
||||
|
||||
-behaviour(supervisor).
|
||||
|
||||
-rabbit_boot_step({?MODULE,
|
||||
[{description, "supervisor of quorum queue dead-letter workers"},
|
||||
{mfa, {rabbit_sup, start_supervisor_child, [?MODULE]}},
|
||||
{requires, kernel_ready},
|
||||
{enables, core_initialized}]}).
|
||||
|
||||
%% supervisor callback
|
||||
-export([init/1]).
|
||||
%% client API
|
||||
-export([start_link/0]).
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
init([]) ->
|
||||
FeatureFlag = quorum_queue,
|
||||
%%TODO rabbit_feature_flags:is_enabled(FeatureFlag) ?
|
||||
case rabbit_ff_registry:is_enabled(FeatureFlag) of
|
||||
true ->
|
||||
SupFlags = #{strategy => simple_one_for_one,
|
||||
intensity => 1,
|
||||
period => 5},
|
||||
Worker = rabbit_fifo_dlx_worker,
|
||||
ChildSpec = #{id => Worker,
|
||||
start => {Worker, start_link, []},
|
||||
type => worker,
|
||||
modules => [Worker]},
|
||||
{ok, {SupFlags, [ChildSpec]}};
|
||||
false ->
|
||||
rabbit_log:info("not starting supervisor ~s because feature flag ~s is disabled",
|
||||
[?MODULE, FeatureFlag]),
|
||||
ignore
|
||||
end.
|
||||
|
|
@ -0,0 +1,571 @@
|
|||
%% This module consumes from a single quroum queue's discards queue (containing dead-letttered messages)
|
||||
%% and forwards the DLX messages at least once to every target queue.
|
||||
%%
|
||||
%% Some parts of this module resemble the channel process in the sense that it needs to keep track what messages
|
||||
%% are consumed but not acked yet and what messages are published but not confirmed yet.
|
||||
%% Compared to the channel process, this module is protocol independent since it doesn't deal with AMQP clients.
|
||||
%%
|
||||
%% This module consumes directly from the rabbit_fifo_dlx_client bypassing the rabbit_queue_type interface,
|
||||
%% but publishes via the rabbit_queue_type interface.
|
||||
%% While consuming via rabbit_queue_type interface would have worked in practice (by using a special consumer argument,
|
||||
%% e.g. {<<"x-internal-queue">>, longstr, <<"discards">>} ) using the rabbit_fifo_dlx_client directly provides
|
||||
%% separation of concerns making things much easier to test, to debug, and to understand.
|
||||
|
||||
-module(rabbit_fifo_dlx_worker).
|
||||
|
||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||
-include_lib("rabbit_common/include/rabbit_framing.hrl").
|
||||
|
||||
-behaviour(gen_server2).
|
||||
|
||||
-export([start_link/2]).
|
||||
%% gen_server2 callbacks
|
||||
-export([init/1, terminate/2, handle_continue/2,
|
||||
handle_cast/2, handle_call/3, handle_info/2,
|
||||
code_change/3, format_status/2]).
|
||||
|
||||
%%TODO make configurable or leave at 0 which means 2000 as in
|
||||
%% https://github.com/rabbitmq/rabbitmq-server/blob/1e7df8c436174735b1d167673afd3f1642da5cdc/deps/rabbit/src/rabbit_quorum_queue.erl#L726-L729
|
||||
-define(CONSUMER_PREFETCH_COUNT, 100).
|
||||
-define(HIBERNATE_AFTER, 180_000).
|
||||
%% If no publisher confirm was received for at least SETTLE_TIMEOUT, message will be redelivered.
|
||||
%% To prevent duplicates in the target queue and to ensure message will eventually be acked to the source queue,
|
||||
%% set this value higher than the maximum time it takes for a queue to settle a message.
|
||||
-define(SETTLE_TIMEOUT, 120_000).
|
||||
|
||||
-record(pending, {
|
||||
%% consumed_msg_id is not to be confused with consumer delivery tag.
|
||||
%% The latter represents a means for AMQP clients to (multi-)ack to a channel process.
|
||||
%% However, queues are not aware of delivery tags.
|
||||
%% This rabbit_fifo_dlx_worker does not have the concept of delivery tags because it settles (acks)
|
||||
%% message IDs directly back to the queue (and there is no AMQP consumer).
|
||||
consumed_msg_id :: non_neg_integer(),
|
||||
content :: rabbit_types:decoded_content(),
|
||||
%% TODO Reason is already stored in first x-death header of #content.properties.#'P_basic'.headers
|
||||
%% So, we could remove this convenience field and lookup the 1st header when redelivering.
|
||||
reason :: rabbit_fifo_dlx:reason(),
|
||||
%%
|
||||
%%TODO instead of using 'unsettled' and 'settled' fields, use rabbit_confirms because it handles many to one logic
|
||||
%% in a generic way. Its API might need to be modified though if it is targeted only towards channel.
|
||||
%%
|
||||
%% target queues for which publisher confirm has not been received yet
|
||||
unsettled = [] :: [rabbit_amqqueue:name()],
|
||||
%% target queues for which publisher confirm was received
|
||||
settled = [] :: [rabbit_amqqueue:name()],
|
||||
%% Number of times the message was published (i.e. rabbit_queue_type:deliver/3 invoked).
|
||||
%% Can be 0 if the message was never published (for example no route exists).
|
||||
publish_count = 0 :: non_neg_integer(),
|
||||
%% Epoch time in milliseconds when the message was last published (i.e. rabbit_queue_type:deliver/3 invoked).
|
||||
%% It can be 'undefined' if the message was never published (for example no route exists).
|
||||
last_published_at :: undefined | integer(),
|
||||
%% Epoch time in milliseconds when the message was consumed from the source quorum queue.
|
||||
%% This value never changes.
|
||||
%% It's mainly informational and meant for debugging to understand for how long the message
|
||||
%% is sitting around without having received all publisher confirms.
|
||||
consumed_at :: integer()
|
||||
}).
|
||||
|
||||
-record(state, {
|
||||
registered_name :: atom(),
|
||||
%% There is one rabbit_fifo_dlx_worker per source quorum queue
|
||||
%% (if dead-letter-strategy at-least-once is used).
|
||||
queue_ref :: rabbit_amqqueue:name(),
|
||||
%% configured (x-)dead-letter-exchange of source queue
|
||||
exchange_ref,
|
||||
%% configured (x-)dead-letter-routing-key of source queue
|
||||
routing_key,
|
||||
dlx_client_state :: rabbit_fifo_dlx_client:state(),
|
||||
queue_type_state :: rabbit_queue_type:state(),
|
||||
%% Consumed messages for which we have not received all publisher confirms yet.
|
||||
%% Therefore, they have not been ACKed yet to the consumer queue.
|
||||
%% This buffer contains at most CONSUMER_PREFETCH_COUNT pending messages at any given point in time.
|
||||
pendings = #{} :: #{OutSeq :: non_neg_integer() => #pending{}},
|
||||
%% next publisher confirm delivery tag sequence number
|
||||
next_out_seq = 1,
|
||||
%% Timer firing every SETTLE_TIMEOUT milliseconds
|
||||
%% redelivering messages for which not all publisher confirms were received.
|
||||
%% If there are no pending messages, this timer will eventually be cancelled to allow
|
||||
%% this worker to hibernate.
|
||||
timer :: undefined | reference()
|
||||
}).
|
||||
|
||||
% -type state() :: #state{}.
|
||||
|
||||
%%TODO add metrics like global counters for messages routed, delivered, etc.
|
||||
|
||||
start_link(QRef, RegName) ->
|
||||
gen_server:start_link({local, RegName},
|
||||
?MODULE, {QRef, RegName},
|
||||
[{hibernate_after, ?HIBERNATE_AFTER}]).
|
||||
|
||||
-spec init({rabbit_amqqueue:name(), atom()}) -> {ok, undefined, {continue, {rabbit_amqqueue:name(), atom()}}}.
|
||||
init(Arg) ->
|
||||
{ok, undefined, {continue, Arg}}.
|
||||
|
||||
handle_continue({QRef, RegName}, undefined) ->
|
||||
State = lookup_topology(#state{queue_ref = QRef}),
|
||||
{ok, Q} = rabbit_amqqueue:lookup(QRef),
|
||||
{ClusterName, _MaybeOldLeaderNode} = amqqueue:get_pid(Q),
|
||||
{ok, ConsumerState} = rabbit_fifo_dlx_client:checkout(RegName,
|
||||
QRef,
|
||||
{ClusterName, node()},
|
||||
?CONSUMER_PREFETCH_COUNT),
|
||||
{noreply, State#state{registered_name = RegName,
|
||||
dlx_client_state = ConsumerState,
|
||||
queue_type_state = rabbit_queue_type:init()}}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
%%TODO cancel timer?
|
||||
ok.
|
||||
|
||||
handle_call(Request, From, State) ->
|
||||
rabbit_log:warning("~s received unhandled call from ~p: ~p", [?MODULE, From, Request]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_cast({queue_event, QRef, {_From, {machine, lookup_topology}}},
|
||||
#state{queue_ref = QRef} = State0) ->
|
||||
State = lookup_topology(State0),
|
||||
redeliver_and_ack(State);
|
||||
handle_cast({queue_event, QRef, {From, Evt}},
|
||||
#state{queue_ref = QRef,
|
||||
dlx_client_state = DlxState0} = State0) ->
|
||||
%% received dead-letter messsage from source queue
|
||||
% rabbit_log:debug("~s received queue event: ~p", [rabbit_misc:rs(QRef), E]),
|
||||
{ok, DlxState, Actions} = rabbit_fifo_dlx_client:handle_ra_event(From, Evt, DlxState0),
|
||||
State1 = State0#state{dlx_client_state = DlxState},
|
||||
State = handle_queue_actions(Actions, State1),
|
||||
{noreply, State};
|
||||
handle_cast({queue_event, QRef, Evt},
|
||||
#state{queue_type_state = QTypeState0} = State0) ->
|
||||
%% received e.g. confirm from target queue
|
||||
case rabbit_queue_type:handle_event(QRef, Evt, QTypeState0) of
|
||||
{ok, QTypeState1, Actions} ->
|
||||
State1 = State0#state{queue_type_state = QTypeState1},
|
||||
State = handle_queue_actions(Actions, State1),
|
||||
{noreply, State};
|
||||
%% TODO handle as done in
|
||||
%% https://github.com/rabbitmq/rabbitmq-server/blob/9cf18e83f279408e20430b55428a2b19156c90d7/deps/rabbit/src/rabbit_channel.erl#L771-L783
|
||||
eol ->
|
||||
{noreply, State0};
|
||||
{protocol_error, _Type, _Reason, _ReasonArgs} ->
|
||||
{noreply, State0}
|
||||
end;
|
||||
handle_cast(settle_timeout, State0) ->
|
||||
State = State0#state{timer = undefined},
|
||||
redeliver_and_ack(State);
|
||||
handle_cast(Request, State) ->
|
||||
rabbit_log:warning("~s received unhandled cast ~p", [?MODULE, Request]),
|
||||
{noreply, State}.
|
||||
|
||||
redeliver_and_ack(State0) ->
|
||||
State1 = redeliver_messsages(State0),
|
||||
%% Routes could have been changed dynamically.
|
||||
%% If a publisher confirm timed out for a target queue to which we now don't route anymore, ack the message.
|
||||
State2 = maybe_ack(State1),
|
||||
State = maybe_set_timer(State2),
|
||||
{noreply, State}.
|
||||
|
||||
%%TODO monitor source quorum queue upon init / handle_continue and terminate ourself if source quorum queue is DOWN
|
||||
%% since new leader will re-create a worker
|
||||
handle_info({'DOWN', _MRef, process, QPid, Reason},
|
||||
#state{queue_type_state = QTypeState0} = State0) ->
|
||||
%% received from target classic queue
|
||||
State = case rabbit_queue_type:handle_down(QPid, Reason, QTypeState0) of
|
||||
{ok, QTypeState, Actions} ->
|
||||
State1 = State0#state{queue_type_state = QTypeState},
|
||||
handle_queue_actions(Actions, State1);
|
||||
{eol, QTypeState1, QRef} ->
|
||||
QTypeState = rabbit_queue_type:remove(QRef, QTypeState1),
|
||||
State0#state{queue_type_state = QTypeState}
|
||||
end,
|
||||
{noreply, State};
|
||||
handle_info(Info, State) ->
|
||||
rabbit_log:warning("~s received unhandled info ~p", [?MODULE, Info]),
|
||||
{noreply, State}.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
lookup_topology(#state{queue_ref = {resource, Vhost, queue, _} = QRef} = State) ->
|
||||
{ok, Q} = rabbit_amqqueue:lookup(QRef),
|
||||
DLRKey = rabbit_queue_type_util:args_policy_lookup(<<"dead-letter-routing-key">>, fun(_Pol, QArg) -> QArg end, Q),
|
||||
DLX = rabbit_queue_type_util:args_policy_lookup(<<"dead-letter-exchange">>, fun(_Pol, QArg) -> QArg end, Q),
|
||||
DLXRef = rabbit_misc:r(Vhost, exchange, DLX),
|
||||
State#state{exchange_ref = DLXRef,
|
||||
routing_key = DLRKey}.
|
||||
|
||||
%% https://github.com/rabbitmq/rabbitmq-server/blob/9cf18e83f279408e20430b55428a2b19156c90d7/deps/rabbit/src/rabbit_channel.erl#L2855-L2888
|
||||
handle_queue_actions(Actions, State0) ->
|
||||
lists:foldl(
|
||||
fun ({deliver, Msgs}, S0) ->
|
||||
S1 = handle_deliver(Msgs, S0),
|
||||
maybe_set_timer(S1);
|
||||
({settled, QRef, MsgSeqs}, S0) ->
|
||||
S1 = handle_settled(QRef, MsgSeqs, S0),
|
||||
S2 = maybe_ack(S1),
|
||||
maybe_cancel_timer(S2);
|
||||
({rejected, QRef, MsgSeqNos}, S0) ->
|
||||
rabbit_log:debug("Ignoring rejected messages ~p from ~s", [MsgSeqNos, rabbit_misc:rs(QRef)]),
|
||||
S0;
|
||||
({queue_down, QRef}, S0) ->
|
||||
%% target classic queue is down, but not deleted
|
||||
rabbit_log:debug("Ignoring DOWN from ~s", [rabbit_misc:rs(QRef)]),
|
||||
S0
|
||||
end, State0, Actions).
|
||||
|
||||
handle_deliver(Msgs, #state{queue_ref = QRef} = State) when is_list(Msgs) ->
|
||||
DLX = lookup_dlx(State),
|
||||
lists:foldl(fun({_QRef, MsgId, Msg, Reason}, S) ->
|
||||
forward(Msg, MsgId, QRef, DLX, Reason, S)
|
||||
end, State, Msgs).
|
||||
|
||||
lookup_dlx(#state{exchange_ref = DLXRef,
|
||||
queue_ref = QRef}) ->
|
||||
case rabbit_exchange:lookup(DLXRef) of
|
||||
{error, not_found} ->
|
||||
rabbit_log:warning("Cannot forward any dead-letter messages from source quorum ~s because its configured "
|
||||
"dead-letter-exchange ~s does not exist. "
|
||||
"Either create the configured dead-letter-exchange or re-configure "
|
||||
"the dead-letter-exchange policy for the source quorum queue to prevent "
|
||||
"dead-lettered messages from piling up in the source quorum queue.",
|
||||
[rabbit_misc:rs(QRef), rabbit_misc:rs(DLXRef)]),
|
||||
not_found;
|
||||
{ok, X} ->
|
||||
X
|
||||
end.
|
||||
|
||||
forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason,
|
||||
#state{next_out_seq = OutSeq,
|
||||
pendings = Pendings,
|
||||
exchange_ref = DLXRef,
|
||||
routing_key = RKey} = State0) ->
|
||||
#basic_message{content = Content, routing_keys = RKeys} = Msg =
|
||||
rabbit_dead_letter:make_msg(ConsumedMsg, Reason, DLXRef, RKey, ConsumedQRef),
|
||||
%% Field 'mandatory' is set to false because our module checks on its own whether the message is routable.
|
||||
Delivery = rabbit_basic:delivery(_Mandatory = false, _Confirm = true, Msg, OutSeq),
|
||||
TargetQs = case DLX of
|
||||
not_found ->
|
||||
[];
|
||||
_ ->
|
||||
RouteToQs = rabbit_exchange:route(DLX, Delivery),
|
||||
case rabbit_dead_letter:detect_cycles(Reason, Msg, RouteToQs) of
|
||||
{[], []} ->
|
||||
rabbit_log:warning("Cannot deliver message with sequence number ~b "
|
||||
"(for consumed message sequence number ~b) "
|
||||
"because no queue is bound to dead-letter ~s with routing keys ~p.",
|
||||
[OutSeq, ConsumedMsgId, rabbit_misc:rs(DLXRef), RKeys]),
|
||||
[];
|
||||
{Qs, []} ->
|
||||
%% the "normal" case, i.e. no dead-letter-topology misconfiguration
|
||||
Qs;
|
||||
{[], Cycles} ->
|
||||
%%TODO introduce structured logging in rabbit_log by using type logger:report
|
||||
rabbit_log:warning("Cannot route to any queues. Detected dead-letter queue cycles. "
|
||||
"Fix the dead-letter routing topology to prevent dead-letter messages from "
|
||||
"piling up in source quorum queue. "
|
||||
"outgoing_sequene_number=~b "
|
||||
"consumed_message_sequence_number=~b "
|
||||
"consumed_queue=~s "
|
||||
"dead_letter_exchange=~s "
|
||||
"effective_dead_letter_routing_keys=~p "
|
||||
"routed_to_queues=~s "
|
||||
"dead_letter_queue_cycles=~p",
|
||||
[OutSeq, ConsumedMsgId, rabbit_misc:rs(ConsumedQRef),
|
||||
rabbit_misc:rs(DLXRef), RKeys, strings(RouteToQs), Cycles]),
|
||||
[];
|
||||
{Qs, Cycles} ->
|
||||
rabbit_log:warning("Detected dead-letter queue cycles. "
|
||||
"Fix the dead-letter routing topology. "
|
||||
"outgoing_sequene_number=~b "
|
||||
"consumed_message_sequence_number=~b "
|
||||
"consumed_queue=~s "
|
||||
"dead_letter_exchange=~s "
|
||||
"effective_dead_letter_routing_keys=~p "
|
||||
"routed_to_queues_desired=~s "
|
||||
"routed_to_queues_effective=~s "
|
||||
"dead_letter_queue_cycles=~p",
|
||||
[OutSeq, ConsumedMsgId, rabbit_misc:rs(ConsumedQRef),
|
||||
rabbit_misc:rs(DLXRef), RKeys, strings(RouteToQs), strings(Qs), Cycles]),
|
||||
%% Ignore the target queues resulting in cycles.
|
||||
%% We decide it's good enough to deliver to only routable target queues.
|
||||
Qs
|
||||
end
|
||||
end,
|
||||
Now = os:system_time(millisecond),
|
||||
State1 = State0#state{next_out_seq = OutSeq + 1},
|
||||
Pend0 = #pending{
|
||||
consumed_msg_id = ConsumedMsgId,
|
||||
consumed_at = Now,
|
||||
content = Content,
|
||||
reason = Reason
|
||||
},
|
||||
case TargetQs of
|
||||
[] ->
|
||||
%% We can't deliver this message since there is no target queue we can route to.
|
||||
%% Under no circumstances should we drop a message with dead-letter-strategy at-least-once.
|
||||
%% We buffer this message and retry to send every SETTLE_TIMEOUT milliseonds
|
||||
%% (until the user has fixed the dead-letter routing topology).
|
||||
State1#state{pendings = maps:put(OutSeq, Pend0, Pendings)};
|
||||
_ ->
|
||||
Pend = Pend0#pending{publish_count = 1,
|
||||
last_published_at = Now,
|
||||
unsettled = TargetQs},
|
||||
State = State1#state{pendings = maps:put(OutSeq, Pend, Pendings)},
|
||||
deliver_to_queues(Delivery, TargetQs, State)
|
||||
end.
|
||||
|
||||
deliver_to_queues(Delivery, RouteToQNames, #state{queue_type_state = QTypeState0} = State0) ->
|
||||
Qs = rabbit_amqqueue:lookup(RouteToQNames),
|
||||
{ok, QTypeState1, Actions} = rabbit_queue_type:deliver(Qs, Delivery, QTypeState0),
|
||||
State = State0#state{queue_type_state = QTypeState1},
|
||||
handle_queue_actions(Actions, State).
|
||||
|
||||
handle_settled(QRef, MsgSeqs, #state{pendings = Pendings0} = State) ->
|
||||
Pendings = lists:foldl(fun (MsgSeq, P0) ->
|
||||
handle_settled0(QRef, MsgSeq, P0)
|
||||
end, Pendings0, MsgSeqs),
|
||||
State#state{pendings = Pendings}.
|
||||
|
||||
handle_settled0(QRef, MsgSeq, Pendings) ->
|
||||
case maps:find(MsgSeq, Pendings) of
|
||||
{ok, #pending{unsettled = Unset0, settled = Set0} = Pend0} ->
|
||||
Unset = lists:delete(QRef, Unset0),
|
||||
Set = [QRef | Set0],
|
||||
Pend = Pend0#pending{unsettled = Unset, settled = Set},
|
||||
maps:update(MsgSeq, Pend, Pendings);
|
||||
error ->
|
||||
rabbit_log:warning("Ignoring publisher confirm for sequence number ~b "
|
||||
"from target dead letter ~s after settle timeout of ~bms. "
|
||||
"Troubleshoot why that queue confirms so slowly.",
|
||||
[MsgSeq, rabbit_misc:rs(QRef), ?SETTLE_TIMEOUT]),
|
||||
Pendings
|
||||
end.
|
||||
|
||||
maybe_ack(#state{pendings = Pendings0,
|
||||
dlx_client_state = DlxState0} = State) ->
|
||||
Settled = maps:filter(fun(_OutSeq, #pending{unsettled = [], settled = [_|_]}) ->
|
||||
%% Ack because there is at least one target queue and all
|
||||
%% target queues settled (i.e. combining publisher confirm
|
||||
%% and mandatory flag semantics).
|
||||
true;
|
||||
(_, _) ->
|
||||
false
|
||||
end, Pendings0),
|
||||
case maps:size(Settled) of
|
||||
0 ->
|
||||
%% nothing to ack
|
||||
State;
|
||||
_ ->
|
||||
Ids = lists:map(fun(#pending{consumed_msg_id = Id}) -> Id end, maps:values(Settled)),
|
||||
case rabbit_fifo_dlx_client:settle(Ids, DlxState0) of
|
||||
{ok, DlxState} ->
|
||||
SettledOutSeqs = maps:keys(Settled),
|
||||
Pendings = maps:without(SettledOutSeqs, Pendings0),
|
||||
State#state{pendings = Pendings,
|
||||
dlx_client_state = DlxState};
|
||||
{error, _Reason} ->
|
||||
%% Failed to ack. Ack will be retried in the next maybe_ack/1
|
||||
State
|
||||
end
|
||||
end.
|
||||
|
||||
%% Re-deliver messages that timed out waiting on publisher confirm and
|
||||
%% messages that got never sent due to routing topology misconfiguration.
|
||||
redeliver_messsages(#state{pendings = Pendings} = State) ->
|
||||
case lookup_dlx(State) of
|
||||
not_found ->
|
||||
%% Configured dead-letter-exchange does (still) not exist.
|
||||
%% Warning got already logged.
|
||||
%% Keep the same Pendings in our state until user creates or re-configures the dead-letter-exchange.
|
||||
State;
|
||||
DLX ->
|
||||
Now = os:system_time(millisecond),
|
||||
maps:fold(fun(OutSeq, #pending{last_published_at = LastPub} = Pend, S0)
|
||||
when LastPub + ?SETTLE_TIMEOUT =< Now ->
|
||||
%% Publisher confirm timed out.
|
||||
redeliver(Pend, DLX, OutSeq, S0);
|
||||
(OutSeq, #pending{last_published_at = undefined} = Pend, S0) ->
|
||||
%% Message was never published due to dead-letter routing topology misconfiguration.
|
||||
redeliver(Pend, DLX, OutSeq, S0);
|
||||
(_OutSeq, _Pending, S) ->
|
||||
%% Publisher confirm did not time out.
|
||||
S
|
||||
end, State, Pendings)
|
||||
end.
|
||||
|
||||
redeliver(#pending{content = Content} = Pend, DLX, OldOutSeq,
|
||||
#state{routing_key = undefined} = State) ->
|
||||
%% No dead-letter-routing-key defined for source quorum queue.
|
||||
%% Therefore use all of messages's original routing keys (which can include CC and BCC recipients).
|
||||
%% This complies with the behaviour of the rabbit_dead_letter module.
|
||||
%% We stored these original routing keys in the 1st (i.e. most recent) x-death entry.
|
||||
#content{properties = #'P_basic'{headers = Headers}} =
|
||||
rabbit_binary_parser:ensure_content_decoded(Content),
|
||||
{array, [{table, MostRecentDeath}|_]} = rabbit_misc:table_lookup(Headers, <<"x-death">>),
|
||||
{<<"routing-keys">>, array, Routes0} = lists:keyfind(<<"routing-keys">>, 1, MostRecentDeath),
|
||||
Routes = [Route || {longstr, Route} <- Routes0],
|
||||
redeliver0(Pend, DLX, Routes, OldOutSeq, State);
|
||||
redeliver(Pend, DLX, OldOutSeq, #state{routing_key = DLRKey} = State) ->
|
||||
redeliver0(Pend, DLX, [DLRKey], OldOutSeq, State).
|
||||
|
||||
%% Quorum queues maintain their own Raft sequene number mapping to the message sequence number (= Raft correlation ID).
|
||||
%% So, they would just send us a 'settled' queue action containing the correct message sequence number.
|
||||
%%
|
||||
%% Classic queues however maintain their state by mapping the message sequence number to pending and confirmed queues.
|
||||
%% While re-using the same message sequence number could work there as well, it just gets unnecssary complicated when
|
||||
%% different target queues settle two separate deliveries referring to the same message sequence number (and same basic message).
|
||||
%%
|
||||
%% Therefore, to keep things simple, create a brand new delivery, store it in our state and forget about the old delivery and
|
||||
%% sequence number.
|
||||
%%
|
||||
%% If a sequene number gets settled after SETTLE_TIMEOUT, we can't map it anymore to the #pending{}. Hence, we ignore it.
|
||||
%%
|
||||
%% This can lead to issues when SETTLE_TIMEOUT is too low and time to settle takes too long.
|
||||
%% For example, if SETTLE_TIMEOUT is set to only 10 seconds, but settling a message takes always longer than 10 seconds
|
||||
%% (e.g. due to extremly slow hypervisor disks that ran out of credit), we will re-deliver the same message all over again
|
||||
%% leading to many duplicates in the target queue without ever acking the message back to the source discards queue.
|
||||
%%
|
||||
%% Therefore, set SETTLE_TIMEOUT reasonably high (e.g. 2 minutes).
|
||||
%%
|
||||
%% TODO do not log per message?
|
||||
redeliver0(#pending{consumed_msg_id = ConsumedMsgId,
|
||||
content = Content,
|
||||
unsettled = Unsettled,
|
||||
settled = Settled,
|
||||
publish_count = PublishCount,
|
||||
reason = Reason} = Pend0,
|
||||
DLX, DLRKeys, OldOutSeq,
|
||||
#state{next_out_seq = OutSeq,
|
||||
queue_ref = QRef,
|
||||
pendings = Pendings0,
|
||||
exchange_ref = DLXRef} = State0) when is_list(DLRKeys) ->
|
||||
BasicMsg = #basic_message{exchange_name = DLXRef,
|
||||
routing_keys = DLRKeys,
|
||||
%% BCC Header was already stripped previously
|
||||
content = Content,
|
||||
id = rabbit_guid:gen(),
|
||||
is_persistent = rabbit_basic:is_message_persistent(Content)
|
||||
},
|
||||
%% Field 'mandatory' is set to false because our module checks on its own whether the message is routable.
|
||||
Delivery = rabbit_basic:delivery(_Mandatory = false, _Confirm = true, BasicMsg, OutSeq),
|
||||
RouteToQs0 = rabbit_exchange:route(DLX, Delivery),
|
||||
%% Do not re-deliver to queues for which we already received a publisher confirm.
|
||||
RouteToQs1 = RouteToQs0 -- Settled,
|
||||
{RouteToQs, Cycles} = rabbit_dead_letter:detect_cycles(Reason, BasicMsg, RouteToQs1),
|
||||
Prefix = io_lib:format("Message has not received required publisher confirm(s). "
|
||||
"Received confirm from: [~s]. "
|
||||
"Did not receive confirm from: [~s]. "
|
||||
"timeout=~bms "
|
||||
"message_sequence_number=~b "
|
||||
"consumed_message_sequence_number=~b "
|
||||
"publish_count=~b.",
|
||||
[strings(Settled), strings(Unsettled), ?SETTLE_TIMEOUT,
|
||||
OldOutSeq, ConsumedMsgId, PublishCount]),
|
||||
case {RouteToQs, Cycles, Settled} of
|
||||
{[], [], []} ->
|
||||
rabbit_log:warning("~s Failed to re-deliver this message because no queue is bound "
|
||||
"to dead-letter ~s with routing keys ~p.",
|
||||
[Prefix, rabbit_misc:rs(DLXRef), DLRKeys]),
|
||||
State0;
|
||||
{[], [], [_|_]} ->
|
||||
rabbit_log:debug("~s Routes changed dynamically so that this message does not need to be routed "
|
||||
"to any queue anymore. This message will be acknowledged to the source ~s.",
|
||||
[Prefix, rabbit_misc:rs(QRef)]),
|
||||
State0;
|
||||
{[], [_|_], []} ->
|
||||
rabbit_log:warning("~s Failed to re-deliver this message because dead-letter queue cycles "
|
||||
"got detected: ~p",
|
||||
[Prefix, Cycles]),
|
||||
State0;
|
||||
{[], [_|_], [_|_]} ->
|
||||
rabbit_log:warning("~s Dead-letter queue cycles detected: ~p. "
|
||||
"This message will nevertheless be acknowledged to the source ~s "
|
||||
"because it received at least one publisher confirm.",
|
||||
[Prefix, Cycles, rabbit_misc:rs(QRef)]),
|
||||
State0;
|
||||
_ ->
|
||||
case Cycles of
|
||||
[] ->
|
||||
rabbit_log:debug("~s Re-delivering this message to ~s",
|
||||
[Prefix, strings(RouteToQs)]);
|
||||
[_|_] ->
|
||||
rabbit_log:warning("~s Dead-letter queue cycles detected: ~p. "
|
||||
"Re-delivering this message only to ~s",
|
||||
[Prefix, Cycles, strings(RouteToQs)])
|
||||
end,
|
||||
Pend = Pend0#pending{publish_count = PublishCount + 1,
|
||||
last_published_at = os:system_time(millisecond),
|
||||
%% override 'unsettled' because topology could have changed
|
||||
unsettled = RouteToQs},
|
||||
Pendings1 = maps:remove(OldOutSeq, Pendings0),
|
||||
Pendings = maps:put(OutSeq, Pend, Pendings1),
|
||||
State = State0#state{next_out_seq = OutSeq + 1,
|
||||
pendings = Pendings},
|
||||
deliver_to_queues(Delivery, RouteToQs, State)
|
||||
end.
|
||||
|
||||
strings(QRefs) when is_list(QRefs) ->
|
||||
L0 = lists:map(fun rabbit_misc:rs/1, QRefs),
|
||||
L1 = lists:join(", ", L0),
|
||||
lists:flatten(L1).
|
||||
|
||||
maybe_set_timer(#state{timer = TRef} = State) when is_reference(TRef) ->
|
||||
State;
|
||||
maybe_set_timer(#state{timer = undefined,
|
||||
pendings = Pendings} = State) when map_size(Pendings) =:= 0 ->
|
||||
State;
|
||||
maybe_set_timer(#state{timer = undefined} = State) ->
|
||||
TRef = erlang:send_after(?SETTLE_TIMEOUT, self(), {'$gen_cast', settle_timeout}),
|
||||
% rabbit_log:debug("set timer"),
|
||||
State#state{timer = TRef}.
|
||||
|
||||
maybe_cancel_timer(#state{timer = undefined} = State) ->
|
||||
State;
|
||||
maybe_cancel_timer(#state{timer = TRef,
|
||||
pendings = Pendings} = State) ->
|
||||
case maps:size(Pendings) of
|
||||
0 ->
|
||||
erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
|
||||
% rabbit_log:debug("cancelled timer"),
|
||||
State#state{timer = undefined};
|
||||
_ ->
|
||||
State
|
||||
end.
|
||||
|
||||
%% Avoids large message contents being logged.
|
||||
format_status(_Opt, [_PDict, #state{
|
||||
registered_name = RegisteredName,
|
||||
queue_ref = QueueRef,
|
||||
exchange_ref = ExchangeRef,
|
||||
routing_key = RoutingKey,
|
||||
dlx_client_state = DlxClientState,
|
||||
queue_type_state = QueueTypeState,
|
||||
pendings = Pendings,
|
||||
next_out_seq = NextOutSeq,
|
||||
timer = Timer
|
||||
}]) ->
|
||||
S = #{registered_name => RegisteredName,
|
||||
queue_ref => QueueRef,
|
||||
exchange_ref => ExchangeRef,
|
||||
routing_key => RoutingKey,
|
||||
dlx_client_state => rabbit_fifo_dlx_client:overview(DlxClientState),
|
||||
queue_type_state => QueueTypeState,
|
||||
pendings => maps:map(fun(_, P) -> format_pending(P) end, Pendings),
|
||||
next_out_seq => NextOutSeq,
|
||||
timer_is_active => Timer =/= undefined},
|
||||
[{data, [{"State", S}]}].
|
||||
|
||||
format_pending(#pending{consumed_msg_id = ConsumedMsgId,
|
||||
reason = Reason,
|
||||
unsettled = Unsettled,
|
||||
settled = Settled,
|
||||
publish_count = PublishCount,
|
||||
last_published_at = LastPublishedAt,
|
||||
consumed_at = ConsumedAt}) ->
|
||||
#{consumed_msg_id => ConsumedMsgId,
|
||||
reason => Reason,
|
||||
unsettled => Unsettled,
|
||||
settled => Settled,
|
||||
publish_count => PublishCount,
|
||||
last_published_at => LastPublishedAt,
|
||||
consumed_at => ConsumedAt}.
|
||||
|
|
@ -130,6 +130,8 @@
|
|||
state/0,
|
||||
config/0]).
|
||||
|
||||
%% This function is never called since only rabbit_fifo_v0:init/1 is called.
|
||||
%% See https://github.com/rabbitmq/ra/blob/e0d1e6315a45f5d3c19875d66f9d7bfaf83a46e3/src/ra_machine.erl#L258-L265
|
||||
-spec init(config()) -> state().
|
||||
init(#{name := Name,
|
||||
queue_resource := Resource} = Conf) ->
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ register() ->
|
|||
{Class, Name} <- [{policy_validator, <<"alternate-exchange">>},
|
||||
{policy_validator, <<"dead-letter-exchange">>},
|
||||
{policy_validator, <<"dead-letter-routing-key">>},
|
||||
{policy_validator, <<"dead-letter-strategy">>},
|
||||
{policy_validator, <<"message-ttl">>},
|
||||
{policy_validator, <<"expires">>},
|
||||
{policy_validator, <<"max-length">>},
|
||||
|
|
@ -85,6 +86,13 @@ validate_policy0(<<"dead-letter-routing-key">>, Value)
|
|||
validate_policy0(<<"dead-letter-routing-key">>, Value) ->
|
||||
{error, "~p is not a valid dead letter routing key", [Value]};
|
||||
|
||||
validate_policy0(<<"dead-letter-strategy">>, <<"at-most-once">>) ->
|
||||
ok;
|
||||
validate_policy0(<<"dead-letter-strategy">>, <<"at-least-once">>) ->
|
||||
ok;
|
||||
validate_policy0(<<"dead-letter-strategy">>, Value) ->
|
||||
{error, "~p is not a valid dead letter strategy", [Value]};
|
||||
|
||||
validate_policy0(<<"message-ttl">>, Value)
|
||||
when is_integer(Value), Value >= 0 ->
|
||||
ok;
|
||||
|
|
|
|||
|
|
@ -71,6 +71,7 @@
|
|||
|
||||
-include_lib("stdlib/include/qlc.hrl").
|
||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||
-include_lib("rabbit_common/include/rabbit_framing.hrl").
|
||||
-include("amqqueue.hrl").
|
||||
|
||||
-type msg_id() :: non_neg_integer().
|
||||
|
|
@ -94,7 +95,9 @@
|
|||
single_active_consumer_pid,
|
||||
single_active_consumer_ctag,
|
||||
messages_ram,
|
||||
message_bytes_ram
|
||||
message_bytes_ram,
|
||||
messages_dlx,
|
||||
message_bytes_dlx
|
||||
]).
|
||||
|
||||
-define(INFO_KEYS, [name, durable, auto_delete, arguments, pid, messages, messages_ready,
|
||||
|
|
@ -227,18 +230,17 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
|
|||
{Name, _} = amqqueue:get_pid(Q),
|
||||
%% take the minimum value of the policy and the queue arg if present
|
||||
MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
|
||||
%% prefer the policy defined strategy if available
|
||||
Overflow = args_policy_lookup(<<"overflow">>, fun (A, _B) -> A end , Q),
|
||||
OverflowBin = args_policy_lookup(<<"overflow">>, fun policyHasPrecedence/2, Q),
|
||||
Overflow = overflow(OverflowBin, drop_head, QName),
|
||||
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
|
||||
MaxMemoryLength = args_policy_lookup(<<"max-in-memory-length">>, fun min/2, Q),
|
||||
MaxMemoryBytes = args_policy_lookup(<<"max-in-memory-bytes">>, fun min/2, Q),
|
||||
DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q),
|
||||
Expires = args_policy_lookup(<<"expires">>,
|
||||
fun (A, _B) -> A end,
|
||||
Q),
|
||||
Expires = args_policy_lookup(<<"expires">>, fun policyHasPrecedence/2, Q),
|
||||
MsgTTL = args_policy_lookup(<<"message-ttl">>, fun min/2, Q),
|
||||
#{name => Name,
|
||||
queue_resource => QName,
|
||||
dead_letter_handler => dlx_mfa(Q),
|
||||
dead_letter_handler => dead_letter_handler(Q, Overflow),
|
||||
become_leader_handler => {?MODULE, become_leader, [QName]},
|
||||
max_length => MaxLength,
|
||||
max_bytes => MaxBytes,
|
||||
|
|
@ -246,11 +248,17 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
|
|||
max_in_memory_bytes => MaxMemoryBytes,
|
||||
single_active_consumer_on => single_active_consumer_on(Q),
|
||||
delivery_limit => DeliveryLimit,
|
||||
overflow_strategy => overflow(Overflow, drop_head, QName),
|
||||
overflow_strategy => Overflow,
|
||||
created => erlang:system_time(millisecond),
|
||||
expires => Expires
|
||||
expires => Expires,
|
||||
msg_ttl => MsgTTL
|
||||
}.
|
||||
|
||||
policyHasPrecedence(Policy, _QueueArg) ->
|
||||
Policy.
|
||||
queueArgHasPrecedence(_Policy, QueueArg) ->
|
||||
QueueArg.
|
||||
|
||||
single_active_consumer_on(Q) ->
|
||||
QArguments = amqqueue:get_arguments(Q),
|
||||
case rabbit_misc:table_lookup(QArguments, <<"x-single-active-consumer">>) of
|
||||
|
|
@ -293,7 +301,7 @@ become_leader(QName, Name) ->
|
|||
end,
|
||||
%% as this function is called synchronously when a ra node becomes leader
|
||||
%% we need to ensure there is no chance of blocking as else the ra node
|
||||
%% may not be able to establish it's leadership
|
||||
%% may not be able to establish its leadership
|
||||
spawn(fun() ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
|
|
@ -377,19 +385,20 @@ filter_quorum_critical(Queues, ReplicaStates) ->
|
|||
capabilities() ->
|
||||
#{unsupported_policies =>
|
||||
[ %% Classic policies
|
||||
<<"message-ttl">>, <<"max-priority">>, <<"queue-mode">>,
|
||||
<<"max-priority">>, <<"queue-mode">>,
|
||||
<<"single-active-consumer">>, <<"ha-mode">>, <<"ha-params">>,
|
||||
<<"ha-sync-mode">>, <<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>,
|
||||
<<"queue-master-locator">>,
|
||||
%% Stream policies
|
||||
<<"max-age">>, <<"stream-max-segment-size-bytes">>,
|
||||
<<"queue-leader-locator">>, <<"initial-cluster-size">>],
|
||||
queue_arguments => [<<"x-expires">>, <<"x-dead-letter-exchange">>,
|
||||
<<"x-dead-letter-routing-key">>, <<"x-max-length">>,
|
||||
<<"x-max-length-bytes">>, <<"x-max-in-memory-length">>,
|
||||
<<"x-max-in-memory-bytes">>, <<"x-overflow">>,
|
||||
<<"x-single-active-consumer">>, <<"x-queue-type">>,
|
||||
<<"x-quorum-initial-group-size">>, <<"x-delivery-limit">>],
|
||||
queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>,
|
||||
<<"x-dead-letter-strategy">>, <<"x-expires">>, <<"x-max-length">>,
|
||||
<<"x-max-length-bytes">>, <<"x-max-in-memory-length">>,
|
||||
<<"x-max-in-memory-bytes">>, <<"x-overflow">>,
|
||||
<<"x-single-active-consumer">>, <<"x-queue-type">>,
|
||||
<<"x-quorum-initial-group-size">>, <<"x-delivery-limit">>,
|
||||
<<"x-message-ttl">>],
|
||||
consumer_arguments => [<<"x-priority">>, <<"x-credit">>],
|
||||
server_named => false}.
|
||||
|
||||
|
|
@ -410,7 +419,7 @@ spawn_notify_decorators(QName, Fun, Args) ->
|
|||
end).
|
||||
|
||||
handle_tick(QName,
|
||||
{Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack},
|
||||
{Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack, MsgBytesDiscard},
|
||||
Nodes) ->
|
||||
%% this makes calls to remote processes so cannot be run inside the
|
||||
%% ra server
|
||||
|
|
@ -429,8 +438,8 @@ handle_tick(QName,
|
|||
{consumer_utilisation, Util},
|
||||
{message_bytes_ready, MsgBytesReady},
|
||||
{message_bytes_unacknowledged, MsgBytesUnack},
|
||||
{message_bytes, MsgBytesReady + MsgBytesUnack},
|
||||
{message_bytes_persistent, MsgBytesReady + MsgBytesUnack},
|
||||
{message_bytes, MsgBytesReady + MsgBytesUnack + MsgBytesDiscard},
|
||||
{message_bytes_persistent, MsgBytesReady + MsgBytesUnack + MsgBytesDiscard},
|
||||
{messages_persistent, M}
|
||||
|
||||
| infos(QName, ?STATISTICS_KEYS -- [consumers])],
|
||||
|
|
@ -839,8 +848,11 @@ deliver(true, Delivery, QState0) ->
|
|||
rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no,
|
||||
Delivery#delivery.message, QState0).
|
||||
|
||||
deliver(QSs, #delivery{confirm = Confirm} = Delivery0) ->
|
||||
Delivery = clean_delivery(Delivery0),
|
||||
deliver(QSs, #delivery{message = #basic_message{content = Content0} = Msg,
|
||||
confirm = Confirm} = Delivery0) ->
|
||||
%% TODO: we could also consider clearing out the message id here
|
||||
Content = prepare_content(Content0),
|
||||
Delivery = Delivery0#delivery{message = Msg#basic_message{content = Content}},
|
||||
lists:foldl(
|
||||
fun({Q, stateless}, {Qs, Actions}) ->
|
||||
QRef = amqqueue:get_pid(Q),
|
||||
|
|
@ -1253,20 +1265,46 @@ reclaim_memory(Vhost, QueueName) ->
|
|||
ra_log_wal:force_roll_over({?RA_WAL_NAME, Node}).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
dlx_mfa(Q) ->
|
||||
DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>,
|
||||
fun res_arg/2, Q), Q),
|
||||
DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>,
|
||||
fun res_arg/2, Q),
|
||||
{?MODULE, dead_letter_publish, [DLX, DLXRKey, amqqueue:get_name(Q)]}.
|
||||
|
||||
init_dlx(undefined, _Q) ->
|
||||
undefined;
|
||||
init_dlx(DLX, Q) when ?is_amqqueue(Q) ->
|
||||
dead_letter_handler(Q, Overflow) ->
|
||||
%% Queue arg continues to take precedence to not break existing configurations
|
||||
%% for queues upgraded from <v3.10 to >=v3.10
|
||||
Exchange = args_policy_lookup(<<"dead-letter-exchange">>, fun queueArgHasPrecedence/2, Q),
|
||||
RoutingKey = args_policy_lookup(<<"dead-letter-routing-key">>, fun queueArgHasPrecedence/2, Q),
|
||||
%% Policy takes precedence because it's a new key introduced in v3.10 and we want
|
||||
%% users to use policies instead of queue args allowing dynamic reconfiguration.
|
||||
%% TODO change to queueArgHasPrecedence for dead-letter-strategy
|
||||
Strategy = args_policy_lookup(<<"dead-letter-strategy">>, fun policyHasPrecedence/2, Q),
|
||||
QName = amqqueue:get_name(Q),
|
||||
rabbit_misc:r(QName, exchange, DLX).
|
||||
dlh(Exchange, RoutingKey, Strategy, Overflow, QName).
|
||||
|
||||
res_arg(_PolVal, ArgVal) -> ArgVal.
|
||||
dlh(undefined, undefined, undefined, _, _) ->
|
||||
undefined;
|
||||
dlh(undefined, RoutingKey, undefined, _, QName) ->
|
||||
rabbit_log:warning("Disabling dead-lettering for ~s despite configured dead-letter-routing-key '~s' "
|
||||
"because dead-letter-exchange is not configured.",
|
||||
[rabbit_misc:rs(QName), RoutingKey]),
|
||||
undefined;
|
||||
dlh(undefined, _, Strategy, _, QName) ->
|
||||
rabbit_log:warning("Disabling dead-lettering for ~s despite configured dead-letter-strategy '~s' "
|
||||
"because dead-letter-exchange is not configured.",
|
||||
[rabbit_misc:rs(QName), Strategy]),
|
||||
undefined;
|
||||
dlh(_, _, <<"at-least-once">>, reject_publish, _) ->
|
||||
at_least_once;
|
||||
dlh(Exchange, RoutingKey, <<"at-least-once">>, drop_head, QName) ->
|
||||
rabbit_log:warning("Falling back to dead-letter-strategy at-most-once for ~s "
|
||||
"because configured dead-letter-strategy at-least-once is incompatible with "
|
||||
"effective overflow strategy drop-head. To enable dead-letter-strategy "
|
||||
"at-least-once, set overflow strategy to reject-publish.",
|
||||
[rabbit_misc:rs(QName)]),
|
||||
dlh_at_most_once(Exchange, RoutingKey, QName);
|
||||
dlh(Exchange, RoutingKey, _, _, QName) ->
|
||||
dlh_at_most_once(Exchange, RoutingKey, QName).
|
||||
|
||||
dlh_at_most_once(Exchange, RoutingKey, QName) ->
|
||||
DLX = rabbit_misc:r(QName, exchange, Exchange),
|
||||
MFA = {?MODULE, dead_letter_publish, [DLX, RoutingKey, QName]},
|
||||
{at_most_once, MFA}.
|
||||
|
||||
dead_letter_publish(undefined, _, _, _) ->
|
||||
ok;
|
||||
|
|
@ -1438,6 +1476,28 @@ i(message_bytes_ram, Q) when ?is_amqqueue(Q) ->
|
|||
{timeout, _} ->
|
||||
0
|
||||
end;
|
||||
i(messages_dlx, Q) when ?is_amqqueue(Q) ->
|
||||
QPid = amqqueue:get_pid(Q),
|
||||
case ra:local_query(QPid,
|
||||
fun rabbit_fifo:query_stat_dlx/1) of
|
||||
{ok, {_, {Num, _}}, _} ->
|
||||
Num;
|
||||
{error, _} ->
|
||||
0;
|
||||
{timeout, _} ->
|
||||
0
|
||||
end;
|
||||
i(message_bytes_dlx, Q) when ?is_amqqueue(Q) ->
|
||||
QPid = amqqueue:get_pid(Q),
|
||||
case ra:local_query(QPid,
|
||||
fun rabbit_fifo:query_stat_dlx/1) of
|
||||
{ok, {_, {_, Bytes}}, _} ->
|
||||
Bytes;
|
||||
{error, _} ->
|
||||
0;
|
||||
{timeout, _} ->
|
||||
0
|
||||
end;
|
||||
i(_K, _Q) -> ''.
|
||||
|
||||
open_files(Name) ->
|
||||
|
|
@ -1582,7 +1642,7 @@ overflow(undefined, Def, _QName) -> Def;
|
|||
overflow(<<"reject-publish">>, _Def, _QName) -> reject_publish;
|
||||
overflow(<<"drop-head">>, _Def, _QName) -> drop_head;
|
||||
overflow(<<"reject-publish-dlx">> = V, Def, QName) ->
|
||||
rabbit_log:warning("Invalid overflow strategy ~p for quorum queue: ~p",
|
||||
rabbit_log:warning("Invalid overflow strategy ~p for quorum queue: ~s",
|
||||
[V, rabbit_misc:rs(QName)]),
|
||||
Def.
|
||||
|
||||
|
|
@ -1626,19 +1686,15 @@ notify_decorators(QName, F, A) ->
|
|||
end.
|
||||
|
||||
%% remove any data that a quorum queue doesn't need
|
||||
clean_delivery(#delivery{message =
|
||||
#basic_message{content = Content0} = Msg} = Delivery) ->
|
||||
Content = case Content0 of
|
||||
#content{properties = none} ->
|
||||
Content0;
|
||||
#content{protocol = none} ->
|
||||
Content0;
|
||||
#content{properties = Props,
|
||||
protocol = Proto} ->
|
||||
Content0#content{properties = none,
|
||||
properties_bin = Proto:encode_properties(Props)}
|
||||
end,
|
||||
|
||||
%% TODO: we could also consider clearing out the message id here
|
||||
Delivery#delivery{message = Msg#basic_message{content = Content}}.
|
||||
|
||||
prepare_content(#content{properties = none} = Content) ->
|
||||
Content;
|
||||
prepare_content(#content{protocol = none} = Content) ->
|
||||
Content;
|
||||
prepare_content(#content{properties = #'P_basic'{expiration = undefined} = Props,
|
||||
protocol = Proto} = Content) ->
|
||||
Content#content{properties = none,
|
||||
properties_bin = Proto:encode_properties(Props)};
|
||||
prepare_content(Content) ->
|
||||
%% expiration is set. Therefore, leave properties decoded so that
|
||||
%% rabbit_fifo can directly parse it without having to decode again.
|
||||
Content.
|
||||
|
|
|
|||
|
|
@ -957,7 +957,9 @@ capabilities() ->
|
|||
<<"single-active-consumer">>, <<"delivery-limit">>,
|
||||
<<"ha-mode">>, <<"ha-params">>, <<"ha-sync-mode">>,
|
||||
<<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>,
|
||||
<<"queue-master-locator">>],
|
||||
<<"queue-master-locator">>,
|
||||
%% Quorum policies
|
||||
<<"dead-letter-strategy">>],
|
||||
queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>,
|
||||
<<"x-max-length">>, <<"x-max-length-bytes">>,
|
||||
<<"x-single-active-consumer">>, <<"x-queue-type">>,
|
||||
|
|
|
|||
|
|
@ -93,7 +93,9 @@ init_per_group(quorum_queue, Config) ->
|
|||
ok ->
|
||||
rabbit_ct_helpers:set_config(
|
||||
Config,
|
||||
[{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
|
||||
[{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>},
|
||||
%%TODO add at-least-once tests
|
||||
{<<"x-dead-letter-strategy">>, longstr, <<"at-most-once">>}]},
|
||||
{queue_durable, true}]);
|
||||
Skip ->
|
||||
Skip
|
||||
|
|
@ -708,7 +710,9 @@ dead_letter_policy(Config) ->
|
|||
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
||||
QName = ?config(queue_name, Config),
|
||||
DLXQName = ?config(queue_name_dlx, Config),
|
||||
Args = ?config(queue_args, Config),
|
||||
Args0 = ?config(queue_args, Config),
|
||||
%% declaring a quorum queue with x-dead-letter-strategy without defining a DLX will fail
|
||||
Args = proplists:delete(<<"x-dead-letter-strategy">>, Args0),
|
||||
Durable = ?config(queue_durable, Config),
|
||||
DLXExchange = ?config(dlx_exchange, Config),
|
||||
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load Diff
|
|
@ -112,7 +112,7 @@
|
|||
-record(basic_message,
|
||||
{exchange_name, %% The exchange where the message was received
|
||||
routing_keys = [], %% Routing keys used during publish
|
||||
content, %% The message content
|
||||
content, %% The message #content record
|
||||
id, %% A `rabbit_guid:gen()` generated id
|
||||
is_persistent}). %% Whether the message was published as persistent
|
||||
|
||||
|
|
|
|||
|
|
@ -174,7 +174,7 @@ const QUEUE_EXTRA_CONTENT_REQUESTS = [];
|
|||
// All help ? popups
|
||||
var HELP = {
|
||||
'delivery-limit':
|
||||
'The number of allowed unsuccessful delivery attempts. Once a message has been delivered unsuccessfully this many times it will be dropped or dead-lettered, depending on the queue configuration.',
|
||||
'The number of allowed unsuccessful delivery attempts. Once a message has been delivered unsuccessfully more than this many times it will be dropped or dead-lettered, depending on the queue configuration.',
|
||||
|
||||
'exchange-auto-delete':
|
||||
'If yes, the exchange will delete itself after at least one queue or exchange has been bound to this one, and then all queues or exchanges have been unbound.',
|
||||
|
|
@ -218,6 +218,9 @@ var HELP = {
|
|||
'queue-dead-letter-routing-key':
|
||||
'Optional replacement routing key to use when a message is dead-lettered. If this is not set, the message\'s original routing key will be used.<br/>(Sets the "<a target="_blank" href="https://rabbitmq.com/dlx.html">x-dead-letter-routing-key</a>" argument.)',
|
||||
|
||||
'queue-dead-letter-strategy':
|
||||
'Valid values are <code>at-most-once</code> or <code>at-least-once</code>. It defaults to <code>at-most-once</code>. This setting is understood only by quorum queues. If <code>at-least-once</code> is set, <code>Overflow behaviour</code> must be set to <code>reject-publish</code>. Otherwise, dead letter strategy will fall back to <code>at-most-once</code>.',
|
||||
|
||||
'queue-single-active-consumer':
|
||||
'If set, makes sure only one consumer at a time consumes from the queue and fails over to another registered consumer in case the active one is cancelled or dies.<br/>(Sets the "<a target="_blank" href="https://rabbitmq.com/consumers.html#single-active-consumer">x-single-active-consumer</a>" argument.)',
|
||||
|
||||
|
|
@ -246,11 +249,14 @@ var HELP = {
|
|||
'Set the queue initial cluster size.',
|
||||
|
||||
'queue-type':
|
||||
'Set the queue type, determining the type of queue to use: raft-based high availability or classic queue. Valid values are <code>quorum</code> or <code>classic</code>. It defaults to <code>classic<code>. <br/>',
|
||||
'Set the queue type, determining the type of queue to use: raft-based high availability or classic queue. Valid values are <code>quorum</code> or <code>classic</code>. It defaults to <code>classic</code>. <br/>',
|
||||
|
||||
'queue-messages':
|
||||
'<p>Message counts.</p><p>Note that "in memory" and "persistent" are not mutually exclusive; persistent messages can be in memory as well as on disc, and transient messages can be paged out if memory is tight. Non-durable queues will consider all messages to be transient.</p>',
|
||||
|
||||
'queue-dead-lettered':
|
||||
'Applies to messages dead-lettered with dead-letter-strategy <code>at-least-once</code>.',
|
||||
|
||||
'queue-message-body-bytes':
|
||||
'<p>The sum total of the sizes of the message bodies in this queue. This only counts message bodies; it does not include message properties (including headers) or metadata used by the queue.</p><p>Note that "in memory" and "persistent" are not mutually exclusive; persistent messages can be in memory as well as on disc, and transient messages can be paged out if memory is tight. Non-durable queues will consider all messages to be transient.</p><p>If a message is routed to multiple queues on publication, its body will be stored only once (in memory and on disk) and shared between queues. The value shown here does not take account of this effect.</p>',
|
||||
|
||||
|
|
|
|||
|
|
@ -103,7 +103,8 @@
|
|||
<span class="argument-link" field="definition" key="overflow" type="string">Overflow behaviour</span> <span class="help" id="queue-overflow"></span> |
|
||||
<span class="argument-link" field="definition" key="expires" type="number">Auto expire</span> </br>
|
||||
<span class="argument-link" field="definition" key="dead-letter-exchange" type="string">Dead letter exchange</span> |
|
||||
<span class="argument-link" field="definition" key="dead-letter-routing-key" type="string">Dead letter routing key</span><br />
|
||||
<span class="argument-link" field="definition" key="dead-letter-routing-key" type="string">Dead letter routing key</span><br/>
|
||||
<span class="argument-link" field="definition" key="message-ttl" type="number">Message TTL</span><span class="help" id="queue-message-ttl"></span></br>
|
||||
</td>
|
||||
<tr>
|
||||
<td>Queues [Classic]</td>
|
||||
|
|
@ -114,7 +115,6 @@
|
|||
<span class="argument-link" field="definition" key="ha-promote-on-shutdown" type="string" value="">HA mirror promotion on shutdown</span> <span class="help" id="policy-ha-promote-on-shutdown"></span> |
|
||||
<span class="argument-link" field="definition" key="ha-promote-on-failure" type="string" value="">HA mirror promotion on failure</span> <span class="help" id="policy-ha-promote-on-failure"></span>
|
||||
</br>
|
||||
<span class="argument-link" field="definition" key="message-ttl" type="number">Message TTL</span> |
|
||||
<span class="argument-link" field="definition" key="queue-mode" type="string" value="lazy">Lazy mode</span> |
|
||||
<span class="argument-link" field="definition" key="queue-version" type="number">Version</span> <span class="help" id="queue-version"></span> |
|
||||
<span class="argument-link" field="definition" key="queue-master-locator" type="string">Master Locator</span></br>
|
||||
|
|
@ -128,7 +128,9 @@
|
|||
<span class="argument-link" field="definition" key="max-in-memory-bytes" type="number">Max in memory bytes</span>
|
||||
<span class="help" id="queue-max-in-memory-bytes"></span> |
|
||||
<span class="argument-link" field="definition" key="delivery-limit" type="number">Delivery limit</span>
|
||||
<span class="help" id="delivery-limit"></span>
|
||||
<span class="help" id="delivery-limit"></span></br>
|
||||
<span class="argument-link" field="definition" key="dead-letter-strategy" type="string">Dead letter strategy</span>
|
||||
<span class="help" id="queue-dead-letter-strategy"></span>
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
|
|
@ -271,13 +273,14 @@
|
|||
<span class="argument-link" field="definitionop" key="max-length" type="number">Max length</span> |
|
||||
<span class="argument-link" field="definitionop" key="max-length-bytes" type="number">Max length bytes</span> |
|
||||
<span class="argument-link" field="definitionop" key="overflow" type="string">Overflow behaviour</span>
|
||||
<span class="help" id="queue-overflow"></span>
|
||||
<span class="help" id="queue-overflow"></span></br>
|
||||
<span class="argument-link" field="definitionop" key="message-ttl" type="number">Message TTL</span>
|
||||
<span class="help" id="queue-message-ttl"></span>
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Queues [Classic]</td>
|
||||
<td>
|
||||
<span class="argument-link" field="definitionop" key="message-ttl" type="number">Message TTL</span> |
|
||||
<span class="argument-link" field="definitionop" key="expires" type="number">Auto expire</span>
|
||||
</td>
|
||||
</tr>
|
||||
|
|
|
|||
|
|
@ -167,6 +167,9 @@
|
|||
<th class="horizontal">Unacked</th>
|
||||
<% if (is_quorum(queue)) { %>
|
||||
<th class="horizontal">In memory ready</th>
|
||||
<th class="horizontal">Dead-lettered
|
||||
<span class="help" id="queue-dead-lettered"></span>
|
||||
</th>
|
||||
<% } %>
|
||||
<% if (is_classic(queue)) { %>
|
||||
<th class="horizontal">In memory</th>
|
||||
|
|
@ -192,6 +195,9 @@
|
|||
<td class="r">
|
||||
<%= fmt_num_thousands(queue.messages_ram) %>
|
||||
</td>
|
||||
<td class="r">
|
||||
<%= fmt_num_thousands(queue.messages_dlx) %>
|
||||
</td>
|
||||
<% } %>
|
||||
<% if (is_classic(queue)) { %>
|
||||
<td class="r">
|
||||
|
|
@ -224,6 +230,11 @@
|
|||
<%= fmt_bytes(queue.message_bytes_ram) %>
|
||||
</td>
|
||||
<% } %>
|
||||
<% if (is_quorum(queue)) { %>
|
||||
<td class="r">
|
||||
<%= fmt_bytes(queue.message_bytes_dlx) %>
|
||||
</td>
|
||||
<% } %>
|
||||
<% if (is_classic(queue)) { %>
|
||||
<td class="r">
|
||||
<%= fmt_bytes(queue.message_bytes_persistent) %>
|
||||
|
|
|
|||
|
|
@ -312,13 +312,11 @@
|
|||
<tr>
|
||||
<td>Add</td>
|
||||
<td>
|
||||
<% if (queue_type == "classic") { %>
|
||||
<span class="argument-link" field="arguments" key="x-message-ttl" type="number">Message TTL</span> <span class="help" id="queue-message-ttl"></span> |
|
||||
<% } %>
|
||||
<% if (queue_type != "stream") { %>
|
||||
<span class="argument-link" field="arguments" key="x-expires" type="number">Auto expire</span> <span class="help" id="queue-expires"></span> |
|
||||
<span class="argument-link" field="arguments" key="x-overflow" type="string">Overflow behaviour</span> <span class="help" id="queue-overflow"></span> |
|
||||
<span class="argument-link" field="arguments" key="x-single-active-consumer" type="boolean">Single active consumer</span> <span class="help" id="queue-single-active-consumer"></span><br/>
|
||||
<span class="argument-link" field="arguments" key="x-message-ttl" type="number">Message TTL</span> <span class="help" id="queue-message-ttl"></span> |
|
||||
<span class="argument-link" field="arguments" key="x-overflow" type="string">Overflow behaviour</span> <span class="help" id="queue-overflow"></span><br/>
|
||||
<span class="argument-link" field="arguments" key="x-single-active-consumer" type="boolean">Single active consumer</span> <span class="help" id="queue-single-active-consumer"></span> |
|
||||
<span class="argument-link" field="arguments" key="x-dead-letter-exchange" type="string">Dead letter exchange</span> <span class="help" id="queue-dead-letter-exchange"></span> |
|
||||
<span class="argument-link" field="arguments" key="x-dead-letter-routing-key" type="string">Dead letter routing key</span> <span class="help" id="queue-dead-letter-routing-key"></span><br/>
|
||||
<span class="argument-link" field="arguments" key="x-max-length" type="number">Max length</span> <span class="help" id="queue-max-length"></span> |
|
||||
|
|
@ -334,7 +332,8 @@
|
|||
<span class="argument-link" field="arguments" key="x-delivery-limit" type="number">Delivery limit</span><span class="help" id="delivery-limit"></span>
|
||||
| <span class="argument-link" field="arguments" key="x-max-in-memory-length" type="number">Max in memory length</span><span class="help" id="queue-max-in-memory-length"></span>
|
||||
| <span class="argument-link" field="arguments" key="x-max-in-memory-bytes" type="number">Max in memory bytes</span><span class="help" id="queue-max-in-memory-bytes"></span>
|
||||
| <span class="argument-link" field="arguments" key="x-quorum-initial-group-size" type="number">Initial cluster size</span><span class="help" id="queue-quorum-initial-group-size"></span><br/>
|
||||
| <span class="argument-link" field="arguments" key="x-quorum-initial-group-size" type="number">Initial cluster size</span><span class="help" id="queue-initial-cluster-size"></span><br/>
|
||||
<span class="argument-link" field="arguments" key="x-dead-letter-strategy" type="string">Dead letter strategy</span><span class="help" id="queue-dead-letter-strategy"></span><br/>
|
||||
<% } %>
|
||||
<% if (queue_type == "stream") { %>
|
||||
<span class="argument-link" field="arguments" key="x-max-age" type="string">Max time retention</span><span class="help" id="queue-max-age"></span>
|
||||
|
|
|
|||
Loading…
Reference in New Issue