Delete commented quorum queue in-memory tests
This commit is contained in:
		
							parent
							
								
									fd2023a118
								
							
						
					
					
						commit
						2db128f683
					
				| 
						 | 
				
			
			@ -1,10 +1,9 @@
 | 
			
		|||
%% This Source Code Form is subject tconsumer_ido the terms of the Mozilla Public
 | 
			
		||||
%% This Source Code Form is subject to the terms of the Mozilla Public
 | 
			
		||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
 | 
			
		||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
 | 
			
		||||
%%
 | 
			
		||||
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
 | 
			
		||||
%%
 | 
			
		||||
%%
 | 
			
		||||
 | 
			
		||||
%% before post gc 1M msg: 203MB, after recovery + gc: 203MB
 | 
			
		||||
 | 
			
		||||
-module(rabbit_fifo).
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,3 +1,9 @@
 | 
			
		|||
%% This Source Code Form is subject to the terms of the Mozilla Public
 | 
			
		||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
 | 
			
		||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
 | 
			
		||||
%%
 | 
			
		||||
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
 | 
			
		||||
 | 
			
		||||
%% macros for memory optimised tuple structures
 | 
			
		||||
%% [A|B] saves 1 byte compared to {A,B}
 | 
			
		||||
-define(TUPLE(A, B), [A | B]).
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -695,11 +695,6 @@ maybe_add_action(Action, Acc, State) ->
 | 
			
		|||
    %% anything else is assumed to be an action
 | 
			
		||||
    {[Action | Acc], State}.
 | 
			
		||||
 | 
			
		||||
% do_resends(From, To, State) when From =< To ->
 | 
			
		||||
%     lists:foldl(fun resend/2, State, lists:seq(From, To));
 | 
			
		||||
% do_resends(_, _, State) ->
 | 
			
		||||
%     State.
 | 
			
		||||
 | 
			
		||||
% resends a command with a new sequence number
 | 
			
		||||
resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) ->
 | 
			
		||||
    case maps:take(OldSeq, Pending0) of
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,3 +1,9 @@
 | 
			
		|||
%% This Source Code Form is subject to the terms of the Mozilla Public
 | 
			
		||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
 | 
			
		||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
 | 
			
		||||
%%
 | 
			
		||||
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
 | 
			
		||||
 | 
			
		||||
-module(rabbit_fifo_dlx).
 | 
			
		||||
 | 
			
		||||
-include("rabbit_fifo_dlx.hrl").
 | 
			
		||||
| 
						 | 
				
			
			@ -29,7 +35,7 @@
 | 
			
		|||
         }).
 | 
			
		||||
-record(settle, {msg_ids :: [msg_id()]}).
 | 
			
		||||
-type protocol() :: {dlx, #checkout{} | #settle{}}.
 | 
			
		||||
-type state() :: #?MODULE{}.
 | 
			
		||||
-opaque state() :: #?MODULE{}.
 | 
			
		||||
-export_type([state/0,
 | 
			
		||||
              protocol/0]).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -37,11 +43,13 @@
 | 
			
		|||
init() ->
 | 
			
		||||
    #?MODULE{}.
 | 
			
		||||
 | 
			
		||||
-spec make_checkout(pid(), non_neg_integer()) -> protocol().
 | 
			
		||||
make_checkout(Pid, NumUnsettled) ->
 | 
			
		||||
    {dlx, #checkout{consumer = Pid,
 | 
			
		||||
                    prefetch = NumUnsettled
 | 
			
		||||
                   }}.
 | 
			
		||||
 | 
			
		||||
-spec make_settle([msg_id()]) -> protocol().
 | 
			
		||||
make_settle(MessageIds) when is_list(MessageIds) ->
 | 
			
		||||
    {dlx, #settle{msg_ids = MessageIds}}.
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -132,8 +140,7 @@ apply(_, Cmd, DLH, State) ->
 | 
			
		|||
    rabbit_log:debug("Ignoring command ~p for dead_letter_handler ~p", [Cmd, DLH]),
 | 
			
		||||
    {State, []}.
 | 
			
		||||
 | 
			
		||||
-spec discard([msg()], rabbit_dead_letter:reason(),
 | 
			
		||||
              dead_letter_handler(), state()) ->
 | 
			
		||||
-spec discard([msg()], rabbit_dead_letter:reason(), dead_letter_handler(), state()) ->
 | 
			
		||||
    {state(), ra_machine:effects()}.
 | 
			
		||||
discard(Msgs, Reason, undefined, State) ->
 | 
			
		||||
    {State, [{mod_call, rabbit_global_counters, messages_dead_lettered,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,3 +1,9 @@
 | 
			
		|||
%% This Source Code Form is subject to the terms of the Mozilla Public
 | 
			
		||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
 | 
			
		||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
 | 
			
		||||
%%
 | 
			
		||||
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
 | 
			
		||||
 | 
			
		||||
-record(dlx_consumer,{
 | 
			
		||||
          pid :: pid(),
 | 
			
		||||
          prefetch :: non_neg_integer(),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,3 +1,9 @@
 | 
			
		|||
%% This Source Code Form is subject to the terms of the Mozilla Public
 | 
			
		||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
 | 
			
		||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
 | 
			
		||||
%%
 | 
			
		||||
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
 | 
			
		||||
 | 
			
		||||
-module(rabbit_fifo_dlx_client).
 | 
			
		||||
 | 
			
		||||
-export([checkout/3, settle/2, handle_ra_event/3,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,3 +1,9 @@
 | 
			
		|||
%% This Source Code Form is subject to the terms of the Mozilla Public
 | 
			
		||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
 | 
			
		||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
 | 
			
		||||
%%
 | 
			
		||||
%% Copyright (c) 2018-2021 VMware, Inc. or its affiliates.  All rights reserved.
 | 
			
		||||
 | 
			
		||||
-module(rabbit_fifo_dlx_sup).
 | 
			
		||||
 | 
			
		||||
-behaviour(supervisor).
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,15 +1,25 @@
 | 
			
		|||
%% This module consumes from a single quroum queue's discards queue (containing dead-letttered messages)
 | 
			
		||||
%% and forwards the DLX messages at least once to every target queue.
 | 
			
		||||
%% This Source Code Form is subject to the terms of the Mozilla Public
 | 
			
		||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
 | 
			
		||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
 | 
			
		||||
%%
 | 
			
		||||
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
 | 
			
		||||
 | 
			
		||||
%% One rabbit_fifo_dlx_worker process exists per (source) quorum queue that has at-least-once dead lettering
 | 
			
		||||
%% enabled. The rabbit_fifo_dlx_worker process is co-located on the quorum queue leader node.
 | 
			
		||||
%% Its job is to consume from the quorum queue's 'discards' queue (containing dead lettered messages)
 | 
			
		||||
%% and to forward each dead lettered message at least once to every target queue.
 | 
			
		||||
%% This is in contrast to at-most-once semantics of rabbit_dead_letter:publish/5 which is
 | 
			
		||||
%% the only option for classic queues and was the only option for quorum queues in RMQ <= v3.9
 | 
			
		||||
%%
 | 
			
		||||
%% Some parts of this module resemble the channel process in the sense that it needs to keep track what messages
 | 
			
		||||
%% are consumed but not acked yet and what messages are published but not confirmed yet.
 | 
			
		||||
%% Compared to the channel process, this module is protocol independent since it doesn't deal with AMQP clients.
 | 
			
		||||
%% Compared to the channel process, this module is protocol independent since it does not deal with AMQP clients.
 | 
			
		||||
%%
 | 
			
		||||
%% This module consumes directly from the rabbit_fifo_dlx_client bypassing the rabbit_queue_type interface,
 | 
			
		||||
%% but publishes via the rabbit_queue_type interface.
 | 
			
		||||
%% While consuming via rabbit_queue_type interface would have worked in practice (by using a special consumer argument,
 | 
			
		||||
%% e.g. {<<"x-internal-queue">>, longstr, <<"discards">>}) using the rabbit_fifo_dlx_client directly provides
 | 
			
		||||
%% separation of concerns making things much easier to test, to debug, and to understand.
 | 
			
		||||
%% separation of concerns making things easier to test, to debug, and to understand.
 | 
			
		||||
 | 
			
		||||
-module(rabbit_fifo_dlx_worker).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -55,22 +65,23 @@
 | 
			
		|||
         }).
 | 
			
		||||
 | 
			
		||||
-record(state, {
 | 
			
		||||
          %% There is one rabbit_fifo_dlx_worker per source quorum queue
 | 
			
		||||
          %% (if dead-letter-strategy at-least-once is used).
 | 
			
		||||
          %% source queue
 | 
			
		||||
          queue_ref :: rabbit_amqqueue:name(),
 | 
			
		||||
          %% monitors source queue
 | 
			
		||||
          monitor_ref :: reference(),
 | 
			
		||||
          %% configured (x-)dead-letter-exchange of source queue
 | 
			
		||||
          exchange_ref,
 | 
			
		||||
          exchange_ref :: rabbit_exchange:name() | undefined,
 | 
			
		||||
          %% configured (x-)dead-letter-routing-key of source queue
 | 
			
		||||
          routing_key,
 | 
			
		||||
          %% client of source queue
 | 
			
		||||
          dlx_client_state :: rabbit_fifo_dlx_client:state(),
 | 
			
		||||
          %% clients of target queues
 | 
			
		||||
          queue_type_state :: rabbit_queue_type:state(),
 | 
			
		||||
          %% Consumed messages for which we are awaiting publisher confirms.
 | 
			
		||||
          pendings = #{} :: #{OutSeq :: non_neg_integer() => #pending{}},
 | 
			
		||||
          %% Consumed message IDs for which we received all publisher confirms.
 | 
			
		||||
          settled_ids = [] :: [non_neg_integer()],
 | 
			
		||||
          %% next publisher confirm delivery tag sequence number
 | 
			
		||||
          %% next outgoing message sequence number
 | 
			
		||||
          next_out_seq = 1,
 | 
			
		||||
          %% If no publisher confirm was received for at least settle_timeout milliseconds, message will be redelivered.
 | 
			
		||||
          %% To prevent duplicates in the target queue and to ensure message will eventually be acked to the source queue,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,7 +3,6 @@
 | 
			
		|||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
 | 
			
		||||
%%
 | 
			
		||||
%% Copyright (c) 2018-2021 VMware, Inc. or its affiliates.  All rights reserved.
 | 
			
		||||
%%
 | 
			
		||||
 | 
			
		||||
-module(quorum_queue_SUITE).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -122,15 +121,7 @@ all_tests() ->
 | 
			
		|||
     subscribe_redelivery_limit,
 | 
			
		||||
     subscribe_redelivery_policy,
 | 
			
		||||
     subscribe_redelivery_limit_with_dead_letter,
 | 
			
		||||
     % queue_length_in_memory_limit_basic_get,
 | 
			
		||||
     % queue_length_in_memory_limit_subscribe,
 | 
			
		||||
     % queue_length_in_memory_limit,
 | 
			
		||||
     % queue_length_in_memory_limit_returns,
 | 
			
		||||
     % queue_length_in_memory_bytes_limit_basic_get,
 | 
			
		||||
     % queue_length_in_memory_bytes_limit_subscribe,
 | 
			
		||||
     % queue_length_in_memory_bytes_limit,
 | 
			
		||||
     % queue_length_in_memory_purge,
 | 
			
		||||
     % in_memory,
 | 
			
		||||
     purge,
 | 
			
		||||
     consumer_metrics,
 | 
			
		||||
     invalid_policy,
 | 
			
		||||
     delete_if_empty,
 | 
			
		||||
| 
						 | 
				
			
			@ -2270,287 +2261,25 @@ queue_length_limit_reject_publish(Config) ->
 | 
			
		|||
    ok = publish_confirm(Ch, QQ),
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
queue_length_in_memory_limit_basic_get(Config) ->
 | 
			
		||||
purge(Config) ->
 | 
			
		||||
    [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
 | 
			
		||||
 | 
			
		||||
    Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
 | 
			
		||||
    QQ = ?config(queue_name, Config),
 | 
			
		||||
    ?assertEqual({'queue.declare_ok', QQ, 0, 0},
 | 
			
		||||
                 declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
 | 
			
		||||
                                  {<<"x-max-in-memory-length">>, long, 1}])),
 | 
			
		||||
 | 
			
		||||
    RaName = ra_name(QQ),
 | 
			
		||||
    Msg1 = <<"msg1">>,
 | 
			
		||||
    ok = amqp_channel:cast(Ch,
 | 
			
		||||
                           #'basic.publish'{routing_key = QQ},
 | 
			
		||||
                           #amqp_msg{props   = #'P_basic'{delivery_mode = 2},
 | 
			
		||||
                                     payload = Msg1}),
 | 
			
		||||
    ok = amqp_channel:cast(Ch,
 | 
			
		||||
                           #'basic.publish'{routing_key = QQ},
 | 
			
		||||
                           #amqp_msg{props   = #'P_basic'{delivery_mode = 2},
 | 
			
		||||
                                     payload = <<"msg2">>}),
 | 
			
		||||
 | 
			
		||||
    wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
 | 
			
		||||
 | 
			
		||||
    ?assertEqual([{1, byte_size(Msg1)}],
 | 
			
		||||
                 dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
 | 
			
		||||
 | 
			
		||||
    ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
 | 
			
		||||
                 amqp_channel:call(Ch, #'basic.get'{queue = QQ,
 | 
			
		||||
                                                    no_ack = true})),
 | 
			
		||||
    ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}},
 | 
			
		||||
                 amqp_channel:call(Ch, #'basic.get'{queue = QQ,
 | 
			
		||||
                                                    no_ack = true})).
 | 
			
		||||
 | 
			
		||||
queue_length_in_memory_limit_subscribe(Config) ->
 | 
			
		||||
    [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
 | 
			
		||||
 | 
			
		||||
    Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
 | 
			
		||||
    QQ = ?config(queue_name, Config),
 | 
			
		||||
    ?assertEqual({'queue.declare_ok', QQ, 0, 0},
 | 
			
		||||
                 declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
 | 
			
		||||
                                  {<<"x-max-in-memory-length">>, long, 1}])),
 | 
			
		||||
                 declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
 | 
			
		||||
 | 
			
		||||
    RaName = ra_name(QQ),
 | 
			
		||||
    Msg1 = <<"msg1">>,
 | 
			
		||||
    Msg2 = <<"msg11">>,
 | 
			
		||||
 | 
			
		||||
    publish(Ch, QQ, Msg1),
 | 
			
		||||
    publish(Ch, QQ, Msg2),
 | 
			
		||||
    wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
 | 
			
		||||
 | 
			
		||||
    ?assertEqual([{1, byte_size(Msg1)}],
 | 
			
		||||
                 dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
 | 
			
		||||
    {'queue.purge_ok', 2} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}),
 | 
			
		||||
 | 
			
		||||
    subscribe(Ch, QQ, false),
 | 
			
		||||
    receive
 | 
			
		||||
        {#'basic.deliver'{delivery_tag = DeliveryTag1,
 | 
			
		||||
                          redelivered  = false},
 | 
			
		||||
         #amqp_msg{payload = Msg1}} ->
 | 
			
		||||
            amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
 | 
			
		||||
                                               multiple     = false})
 | 
			
		||||
    end,
 | 
			
		||||
    ?assertEqual([{0, 0}],
 | 
			
		||||
                 dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
 | 
			
		||||
    receive
 | 
			
		||||
        {#'basic.deliver'{delivery_tag = DeliveryTag2,
 | 
			
		||||
                          redelivered  = false},
 | 
			
		||||
         #amqp_msg{payload = Msg2}} ->
 | 
			
		||||
            amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2,
 | 
			
		||||
                                               multiple     = false})
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
queue_length_in_memory_limit(Config) ->
 | 
			
		||||
    [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
 | 
			
		||||
 | 
			
		||||
    Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
 | 
			
		||||
    QQ = ?config(queue_name, Config),
 | 
			
		||||
    ?assertEqual({'queue.declare_ok', QQ, 0, 0},
 | 
			
		||||
                 declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
 | 
			
		||||
                                  {<<"x-max-in-memory-length">>, long, 2}])),
 | 
			
		||||
 | 
			
		||||
    RaName = ra_name(QQ),
 | 
			
		||||
    Msg1 = <<"msg1">>,
 | 
			
		||||
    Msg2 = <<"msg11">>,
 | 
			
		||||
    Msg3 = <<"msg111">>,
 | 
			
		||||
    Msg4 = <<"msg1111">>,
 | 
			
		||||
    Msg5 = <<"msg1111">>,
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    publish(Ch, QQ, Msg1),
 | 
			
		||||
    publish(Ch, QQ, Msg2),
 | 
			
		||||
    publish(Ch, QQ, Msg3),
 | 
			
		||||
    wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
 | 
			
		||||
 | 
			
		||||
    ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}],
 | 
			
		||||
                 dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
 | 
			
		||||
 | 
			
		||||
    ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
 | 
			
		||||
                 amqp_channel:call(Ch, #'basic.get'{queue = QQ,
 | 
			
		||||
                                                    no_ack = true})),
 | 
			
		||||
 | 
			
		||||
    wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
 | 
			
		||||
    publish(Ch, QQ, Msg4),
 | 
			
		||||
    wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
 | 
			
		||||
 | 
			
		||||
    ?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}],
 | 
			
		||||
                 dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
 | 
			
		||||
    publish(Ch, QQ, Msg5),
 | 
			
		||||
    wait_for_messages(Config, [[QQ, <<"4">>, <<"4">>, <<"0">>]]),
 | 
			
		||||
    ExpectedMsgs = [Msg2, Msg3, Msg4, Msg5],
 | 
			
		||||
    validate_queue(Ch, QQ, ExpectedMsgs),
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
queue_length_in_memory_limit_returns(Config) ->
 | 
			
		||||
    [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
 | 
			
		||||
 | 
			
		||||
    Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
 | 
			
		||||
    QQ = ?config(queue_name, Config),
 | 
			
		||||
    ?assertEqual({'queue.declare_ok', QQ, 0, 0},
 | 
			
		||||
                 declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
 | 
			
		||||
                                  {<<"x-max-in-memory-length">>, long, 2}])),
 | 
			
		||||
 | 
			
		||||
    RaName = ra_name(QQ),
 | 
			
		||||
    Msg1 = <<"msg1">>,
 | 
			
		||||
    Msg2 = <<"msg11">>,
 | 
			
		||||
    Msg3 = <<"msg111">>,
 | 
			
		||||
    Msg4 = <<"msg111">>,
 | 
			
		||||
    publish(Ch, QQ, Msg1),
 | 
			
		||||
    publish(Ch, QQ, Msg2),
 | 
			
		||||
    wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
 | 
			
		||||
 | 
			
		||||
    ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}],
 | 
			
		||||
                 dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
 | 
			
		||||
 | 
			
		||||
    ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
 | 
			
		||||
                 amqp_channel:call(Ch, #'basic.get'{queue = QQ,
 | 
			
		||||
                                                    no_ack = false})),
 | 
			
		||||
 | 
			
		||||
    {#'basic.get_ok'{delivery_tag = DTag2}, #amqp_msg{payload = Msg2}} =
 | 
			
		||||
        amqp_channel:call(Ch, #'basic.get'{queue = QQ,
 | 
			
		||||
                                           no_ack = false}),
 | 
			
		||||
 | 
			
		||||
    publish(Ch, QQ, Msg3),
 | 
			
		||||
    publish(Ch, QQ, Msg4),
 | 
			
		||||
 | 
			
		||||
    %% Ensure that returns are subject to in memory limits too
 | 
			
		||||
    wait_for_messages(Config, [[QQ, <<"4">>, <<"2">>, <<"2">>]]),
 | 
			
		||||
    amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2,
 | 
			
		||||
                                        multiple     = true,
 | 
			
		||||
                                        requeue      = true}),
 | 
			
		||||
    wait_for_messages(Config, [[QQ, <<"4">>, <<"4">>, <<"0">>]]),
 | 
			
		||||
 | 
			
		||||
    ?assertEqual([{2, byte_size(Msg3) + byte_size(Msg4)}],
 | 
			
		||||
                 dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
 | 
			
		||||
 | 
			
		||||
queue_length_in_memory_bytes_limit_basic_get(Config) ->
 | 
			
		||||
    [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
 | 
			
		||||
 | 
			
		||||
    Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
 | 
			
		||||
    QQ = ?config(queue_name, Config),
 | 
			
		||||
    ?assertEqual({'queue.declare_ok', QQ, 0, 0},
 | 
			
		||||
                 declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
 | 
			
		||||
                                  {<<"x-max-in-memory-bytes">>, long, 6}])),
 | 
			
		||||
 | 
			
		||||
    RaName = ra_name(QQ),
 | 
			
		||||
    Msg1 = <<"msg1">>,
 | 
			
		||||
    ok = amqp_channel:cast(Ch,
 | 
			
		||||
                           #'basic.publish'{routing_key = QQ},
 | 
			
		||||
                           #amqp_msg{props   = #'P_basic'{delivery_mode = 2},
 | 
			
		||||
                                     payload = Msg1}),
 | 
			
		||||
    ok = amqp_channel:cast(Ch,
 | 
			
		||||
                           #'basic.publish'{routing_key = QQ},
 | 
			
		||||
                           #amqp_msg{props   = #'P_basic'{delivery_mode = 2},
 | 
			
		||||
                                     payload = <<"msg2">>}),
 | 
			
		||||
 | 
			
		||||
    wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
 | 
			
		||||
 | 
			
		||||
    ?assertEqual([{1, byte_size(Msg1)}],
 | 
			
		||||
                 dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
 | 
			
		||||
 | 
			
		||||
    ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
 | 
			
		||||
                 amqp_channel:call(Ch, #'basic.get'{queue = QQ,
 | 
			
		||||
                                                    no_ack = true})),
 | 
			
		||||
    ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}},
 | 
			
		||||
                 amqp_channel:call(Ch, #'basic.get'{queue = QQ,
 | 
			
		||||
                                                    no_ack = true})).
 | 
			
		||||
 | 
			
		||||
queue_length_in_memory_bytes_limit_subscribe(Config) ->
 | 
			
		||||
    [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
 | 
			
		||||
 | 
			
		||||
    Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
 | 
			
		||||
    QQ = ?config(queue_name, Config),
 | 
			
		||||
    ?assertEqual({'queue.declare_ok', QQ, 0, 0},
 | 
			
		||||
                 declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
 | 
			
		||||
                                  {<<"x-max-in-memory-bytes">>, long, 6}])),
 | 
			
		||||
 | 
			
		||||
    RaName = ra_name(QQ),
 | 
			
		||||
    Msg1 = <<"msg1">>,
 | 
			
		||||
    Msg2 = <<"msg11">>,
 | 
			
		||||
    publish(Ch, QQ, Msg1),
 | 
			
		||||
    publish(Ch, QQ, Msg2),
 | 
			
		||||
    wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
 | 
			
		||||
 | 
			
		||||
    ?assertEqual([{1, byte_size(Msg1)}],
 | 
			
		||||
                 dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
 | 
			
		||||
 | 
			
		||||
    subscribe(Ch, QQ, false),
 | 
			
		||||
    receive
 | 
			
		||||
        {#'basic.deliver'{delivery_tag = DeliveryTag1,
 | 
			
		||||
                          redelivered  = false},
 | 
			
		||||
         #amqp_msg{payload = Msg1}} ->
 | 
			
		||||
            amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
 | 
			
		||||
                                               multiple     = false})
 | 
			
		||||
    end,
 | 
			
		||||
    ?assertEqual([{0, 0}],
 | 
			
		||||
                 dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
 | 
			
		||||
    receive
 | 
			
		||||
        {#'basic.deliver'{delivery_tag = DeliveryTag2,
 | 
			
		||||
                          redelivered  = false},
 | 
			
		||||
         #amqp_msg{payload = Msg2}} ->
 | 
			
		||||
            amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2,
 | 
			
		||||
                                               multiple     = false})
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
queue_length_in_memory_bytes_limit(Config) ->
 | 
			
		||||
    [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
 | 
			
		||||
 | 
			
		||||
    Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
 | 
			
		||||
    QQ = ?config(queue_name, Config),
 | 
			
		||||
    ?assertEqual({'queue.declare_ok', QQ, 0, 0},
 | 
			
		||||
                 declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
 | 
			
		||||
                                  {<<"x-max-in-memory-bytes">>, long, 12}])),
 | 
			
		||||
 | 
			
		||||
    RaName = ra_name(QQ),
 | 
			
		||||
    Msg1 = <<"msg1">>,
 | 
			
		||||
    Msg2 = <<"msg11">>,
 | 
			
		||||
    Msg3 = <<"msg111">>,
 | 
			
		||||
    Msg4 = <<"msg1111">>,
 | 
			
		||||
 | 
			
		||||
    publish(Ch, QQ, Msg1),
 | 
			
		||||
    publish(Ch, QQ, Msg2),
 | 
			
		||||
    publish(Ch, QQ, Msg3),
 | 
			
		||||
    wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
 | 
			
		||||
 | 
			
		||||
    ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}],
 | 
			
		||||
                 dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
 | 
			
		||||
 | 
			
		||||
    ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
 | 
			
		||||
                 amqp_channel:call(Ch, #'basic.get'{queue = QQ,
 | 
			
		||||
                                                    no_ack = true})),
 | 
			
		||||
 | 
			
		||||
    wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
 | 
			
		||||
    publish(Ch, QQ, Msg4),
 | 
			
		||||
    wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
 | 
			
		||||
 | 
			
		||||
    ?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}],
 | 
			
		||||
                 dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
 | 
			
		||||
 | 
			
		||||
queue_length_in_memory_purge(Config) ->
 | 
			
		||||
    [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
 | 
			
		||||
 | 
			
		||||
    Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
 | 
			
		||||
    QQ = ?config(queue_name, Config),
 | 
			
		||||
    ?assertEqual({'queue.declare_ok', QQ, 0, 0},
 | 
			
		||||
                 declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
 | 
			
		||||
                                  {<<"x-max-in-memory-length">>, long, 2}])),
 | 
			
		||||
 | 
			
		||||
    RaName = ra_name(QQ),
 | 
			
		||||
    Msg1 = <<"msg1">>,
 | 
			
		||||
    Msg2 = <<"msg11">>,
 | 
			
		||||
    Msg3 = <<"msg111">>,
 | 
			
		||||
 | 
			
		||||
    publish(Ch, QQ, Msg1),
 | 
			
		||||
    publish(Ch, QQ, Msg2),
 | 
			
		||||
    publish(Ch, QQ, Msg3),
 | 
			
		||||
    wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
 | 
			
		||||
 | 
			
		||||
    ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}],
 | 
			
		||||
                 dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
 | 
			
		||||
 | 
			
		||||
    {'queue.purge_ok', 3} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}),
 | 
			
		||||
 | 
			
		||||
    ?assertEqual([{0, 0}],
 | 
			
		||||
                 dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
 | 
			
		||||
    ?assertEqual([0], dirty_query([Server], RaName, fun rabbit_fifo:query_messages_total/1)).
 | 
			
		||||
 | 
			
		||||
peek(Config) ->
 | 
			
		||||
    [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
 | 
			
		||||
| 
						 | 
				
			
			@ -2673,46 +2402,6 @@ per_message_ttl_mixed_expiry(Config) ->
 | 
			
		|||
    end,
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
in_memory(Config) ->
 | 
			
		||||
    [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
 | 
			
		||||
 | 
			
		||||
    Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
 | 
			
		||||
    QQ = ?config(queue_name, Config),
 | 
			
		||||
    ?assertEqual({'queue.declare_ok', QQ, 0, 0},
 | 
			
		||||
                 declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
 | 
			
		||||
 | 
			
		||||
    RaName = ra_name(QQ),
 | 
			
		||||
    Msg1 = <<"msg1">>,
 | 
			
		||||
    Msg2 = <<"msg11">>,
 | 
			
		||||
 | 
			
		||||
    publish(Ch, QQ, Msg1),
 | 
			
		||||
 | 
			
		||||
    wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
 | 
			
		||||
    ?assertEqual([{1, byte_size(Msg1)}],
 | 
			
		||||
                 dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
 | 
			
		||||
 | 
			
		||||
    subscribe(Ch, QQ, false),
 | 
			
		||||
 | 
			
		||||
    wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
 | 
			
		||||
    ?assertEqual([{0, 0}],
 | 
			
		||||
                 dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
 | 
			
		||||
 | 
			
		||||
    publish(Ch, QQ, Msg2),
 | 
			
		||||
 | 
			
		||||
    wait_for_messages(Config, [[QQ, <<"2">>, <<"0">>, <<"2">>]]),
 | 
			
		||||
    ?assertEqual([{0, 0}],
 | 
			
		||||
                 dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
 | 
			
		||||
 | 
			
		||||
    receive
 | 
			
		||||
        {#'basic.deliver'{delivery_tag = DeliveryTag}, #amqp_msg{}} ->
 | 
			
		||||
            amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
 | 
			
		||||
                                               multiple     = false})
 | 
			
		||||
    end,
 | 
			
		||||
 | 
			
		||||
    wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
 | 
			
		||||
    ?assertEqual([{0, 0}],
 | 
			
		||||
                 dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
 | 
			
		||||
 | 
			
		||||
consumer_metrics(Config) ->
 | 
			
		||||
    [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,9 +1,14 @@
 | 
			
		|||
%% This Source Code Form is subject to the terms of the Mozilla Public
 | 
			
		||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
 | 
			
		||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
 | 
			
		||||
%%
 | 
			
		||||
%% Copyright (c) 2018-2021 VMware, Inc. or its affiliates.  All rights reserved.
 | 
			
		||||
 | 
			
		||||
-module(rabbit_fifo_dlx_SUITE).
 | 
			
		||||
 | 
			
		||||
-compile(nowarn_export_all).
 | 
			
		||||
-compile(export_all).
 | 
			
		||||
 | 
			
		||||
% -include_lib("common_test/include/ct.hrl").
 | 
			
		||||
-include_lib("eunit/include/eunit.hrl").
 | 
			
		||||
-include_lib("rabbit/src/rabbit_fifo.hrl").
 | 
			
		||||
-include_lib("rabbit/src/rabbit_fifo_dlx.hrl").
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,3 +1,9 @@
 | 
			
		|||
%% This Source Code Form is subject to the terms of the Mozilla Public
 | 
			
		||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
 | 
			
		||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
 | 
			
		||||
%%
 | 
			
		||||
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
 | 
			
		||||
 | 
			
		||||
-module(rabbit_fifo_dlx_integration_SUITE).
 | 
			
		||||
 | 
			
		||||
%% Integration tests for at-least-once dead-lettering comprising mainly
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue