Enable adding queues with plugins, core
This commit is contained in:
		
							parent
							
								
									dc0d4735c2
								
							
						
					
					
						commit
						3c1a890a0a
					
				| 
						 | 
				
			
			@ -48,6 +48,10 @@
 | 
			
		|||
        (?is_amqqueue_v2(Q) andalso
 | 
			
		||||
         ?amqqueue_v2_field_type(Q) =:= Type)).
 | 
			
		||||
 | 
			
		||||
-define(amqqueue_type(Q),
 | 
			
		||||
        (?is_amqqueue_v2(Q) andalso
 | 
			
		||||
         ?amqqueue_v2_field_type(Q))).
 | 
			
		||||
 | 
			
		||||
-define(amqqueue_has_valid_pid(Q),
 | 
			
		||||
        (?is_amqqueue_v2(Q) andalso
 | 
			
		||||
         is_pid(?amqqueue_v2_field_pid(Q)))).
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -368,6 +368,7 @@ get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) ->
 | 
			
		|||
 | 
			
		||||
-spec get_leader(amqqueue_v2()) -> node().
 | 
			
		||||
 | 
			
		||||
%% TODO: not only qqs can have leaders, dispatch via queue type
 | 
			
		||||
get_leader(#amqqueue{type = rabbit_quorum_queue, pid = {_, Leader}}) -> Leader.
 | 
			
		||||
 | 
			
		||||
% operator_policy
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -461,32 +461,8 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
 | 
			
		|||
-spec queue_topology(amqqueue:amqqueue()) ->
 | 
			
		||||
    {Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
 | 
			
		||||
queue_topology(Q) ->
 | 
			
		||||
    case amqqueue:get_type(Q) of
 | 
			
		||||
        rabbit_quorum_queue ->
 | 
			
		||||
            [{leader, Leader0},
 | 
			
		||||
             {members, Members}] = rabbit_queue_type:info(Q, [leader, members]),
 | 
			
		||||
            Leader = case Leader0 of
 | 
			
		||||
                         '' -> undefined;
 | 
			
		||||
                         _ -> Leader0
 | 
			
		||||
                     end,
 | 
			
		||||
            {Leader, Members};
 | 
			
		||||
        rabbit_stream_queue ->
 | 
			
		||||
            #{name := StreamId} = amqqueue:get_type_state(Q),
 | 
			
		||||
            case rabbit_stream_coordinator:members(StreamId) of
 | 
			
		||||
                {ok, Members} ->
 | 
			
		||||
                    maps:fold(fun(Node, {_Pid, writer}, {_, Replicas}) ->
 | 
			
		||||
                                      {Node, [Node | Replicas]};
 | 
			
		||||
                                 (Node, {_Pid, replica}, {Writer, Replicas}) ->
 | 
			
		||||
                                      {Writer, [Node | Replicas]}
 | 
			
		||||
                              end, {undefined, []}, Members);
 | 
			
		||||
                {error, _} ->
 | 
			
		||||
                    {undefined, undefined}
 | 
			
		||||
            end;
 | 
			
		||||
        _ ->
 | 
			
		||||
            Pid = amqqueue:get_pid(Q),
 | 
			
		||||
            Node = node(Pid),
 | 
			
		||||
            {Node, [Node]}
 | 
			
		||||
    end.
 | 
			
		||||
    Type = amqqueue:get_type(Q),
 | 
			
		||||
    Type:queue_topology(Q).
 | 
			
		||||
 | 
			
		||||
decode_exchange({map, KVList}) ->
 | 
			
		||||
    M = lists:foldl(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -150,11 +150,7 @@ filter_pid_per_type(QPids) ->
 | 
			
		|||
 | 
			
		||||
-spec stop(rabbit_types:vhost()) -> 'ok'.
 | 
			
		||||
stop(VHost) ->
 | 
			
		||||
    %% Classic queues
 | 
			
		||||
    ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
 | 
			
		||||
    {ok, BQ} = application:get_env(rabbit, backing_queue_module),
 | 
			
		||||
    ok = BQ:stop(VHost),
 | 
			
		||||
    rabbit_quorum_queue:stop(VHost).
 | 
			
		||||
    rabbit_queue_type:stop(VHost).
 | 
			
		||||
 | 
			
		||||
-spec start([amqqueue:amqqueue()]) -> 'ok'.
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -424,6 +420,8 @@ rebalance(Type, VhostSpec, QueueSpec) ->
 | 
			
		|||
    %% We have not yet acquired the rebalance_queues global lock.
 | 
			
		||||
    maybe_rebalance(get_rebalance_lock(self()), Type, VhostSpec, QueueSpec).
 | 
			
		||||
 | 
			
		||||
%% TODO: classic queues do not support rebalancing, it looks like they are simply
 | 
			
		||||
%% filtered out with is_replicated(Q). Maybe error instead?
 | 
			
		||||
maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
 | 
			
		||||
    rabbit_log:info("Starting queue rebalance operation: '~ts' for vhosts matching '~ts' and queues matching '~ts'",
 | 
			
		||||
                    [Type, VhostSpec, QueueSpec]),
 | 
			
		||||
| 
						 | 
				
			
			@ -459,10 +457,15 @@ filter_per_type(stream, Q) ->
 | 
			
		|||
filter_per_type(classic, Q) ->
 | 
			
		||||
    ?amqqueue_is_classic(Q).
 | 
			
		||||
 | 
			
		||||
rebalance_module(Q) when ?amqqueue_is_quorum(Q) ->
 | 
			
		||||
    rabbit_quorum_queue;
 | 
			
		||||
rebalance_module(Q) when ?amqqueue_is_stream(Q) ->
 | 
			
		||||
    rabbit_stream_queue.
 | 
			
		||||
%% TODO: note that it can return {error, not_supported}.
 | 
			
		||||
%% this will result in a badmatch. However that's fine
 | 
			
		||||
%% for now because the original function will fail with
 | 
			
		||||
%% bad clause if called with classical queue.
 | 
			
		||||
%% The assumption is all non-replicated queues
 | 
			
		||||
%% are filtered before calling this with is_replicated/0
 | 
			
		||||
rebalance_module(Q) ->
 | 
			
		||||
    TypeModule = ?amqqueue_type(Q),
 | 
			
		||||
    TypeModule:rebalance_module().
 | 
			
		||||
 | 
			
		||||
get_resource_name(#resource{name = Name}) ->
 | 
			
		||||
    Name.
 | 
			
		||||
| 
						 | 
				
			
			@ -487,13 +490,19 @@ iterative_rebalance(ByNode, MaxQueuesDesired) ->
 | 
			
		|||
maybe_migrate(ByNode, MaxQueuesDesired) ->
 | 
			
		||||
    maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)).
 | 
			
		||||
 | 
			
		||||
%% TODO: unfortunate part - UI bits mixed deep inside logic.
 | 
			
		||||
%% I will not be moving this inside queue type. Instead
 | 
			
		||||
%% an attempt to generate something more readable than
 | 
			
		||||
%% Other made.
 | 
			
		||||
column_name(rabbit_classic_queue) -> <<"Number of replicated classic queues">>;
 | 
			
		||||
column_name(rabbit_quorum_queue) -> <<"Number of quorum queues">>;
 | 
			
		||||
column_name(rabbit_stream_queue) -> <<"Number of streams">>;
 | 
			
		||||
column_name(Other) -> Other.
 | 
			
		||||
column_name(TypeModule) ->
 | 
			
		||||
    Alias = rabbit_queue_type:short_alias_of(TypeModule),
 | 
			
		||||
    <<"Number of \"", Alias/binary, "\" queues">>.
 | 
			
		||||
 | 
			
		||||
maybe_migrate(ByNode, _, []) ->
 | 
			
		||||
    ByNodeAndType = maps:map(fun(_Node, Queues) -> maps:groups_from_list(fun({_, Q, _}) -> column_name(?amqqueue_v2_field_type(Q)) end, Queues) end, ByNode),
 | 
			
		||||
    ByNodeAndType = maps:map(fun(_Node, Queues) -> maps:groups_from_list(fun({_, Q, _}) -> column_name(?amqqueue_type(Q)) end, Queues) end, ByNode),
 | 
			
		||||
    CountByNodeAndType = maps:map(fun(_Node, Type) -> maps:map(fun (_, Qs)-> length(Qs) end, Type) end, ByNodeAndType),
 | 
			
		||||
    {ok, maps:values(maps:map(fun(Node,Counts) -> [{<<"Node name">>, Node} | maps:to_list(Counts)] end, CountByNodeAndType))};
 | 
			
		||||
maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -1252,14 +1261,12 @@ list_durable() ->
 | 
			
		|||
 | 
			
		||||
-spec list_by_type(atom()) -> [amqqueue:amqqueue()].
 | 
			
		||||
 | 
			
		||||
list_by_type(classic) -> list_by_type(rabbit_classic_queue);
 | 
			
		||||
list_by_type(quorum)  -> list_by_type(rabbit_quorum_queue);
 | 
			
		||||
list_by_type(stream)  -> list_by_type(rabbit_stream_queue);
 | 
			
		||||
list_by_type(Type) ->
 | 
			
		||||
    rabbit_db_queue:get_all_durable_by_type(Type).
 | 
			
		||||
list_by_type(TypeDescriptor) ->
 | 
			
		||||
    TypeModule = rabbit_queue_type:discover(TypeDescriptor),
 | 
			
		||||
    rabbit_db_queue:get_all_durable_by_type(TypeModule).
 | 
			
		||||
 | 
			
		||||
%% TODO: looks unused
 | 
			
		||||
-spec list_local_quorum_queue_names() -> [name()].
 | 
			
		||||
 | 
			
		||||
list_local_quorum_queue_names() ->
 | 
			
		||||
    [ amqqueue:get_name(Q) || Q <- list_by_type(quorum),
 | 
			
		||||
           amqqueue:get_state(Q) =/= crashed,
 | 
			
		||||
| 
						 | 
				
			
			@ -1296,6 +1303,7 @@ list_local_followers() ->
 | 
			
		|||
         rabbit_quorum_queue:is_recoverable(Q)
 | 
			
		||||
         ].
 | 
			
		||||
 | 
			
		||||
%% TODO: looks unused
 | 
			
		||||
-spec list_local_quorum_queues_with_name_matching(binary()) -> [amqqueue:amqqueue()].
 | 
			
		||||
list_local_quorum_queues_with_name_matching(Pattern) ->
 | 
			
		||||
    [ Q || Q <- list_by_type(quorum),
 | 
			
		||||
| 
						 | 
				
			
			@ -1882,11 +1890,9 @@ run_backing_queue(QPid, Mod, Fun) ->
 | 
			
		|||
 | 
			
		||||
-spec is_replicated(amqqueue:amqqueue()) -> boolean().
 | 
			
		||||
 | 
			
		||||
is_replicated(Q) when ?amqqueue_is_classic(Q) ->
 | 
			
		||||
    false;
 | 
			
		||||
is_replicated(_Q) ->
 | 
			
		||||
    %% streams and quorum queues are all replicated
 | 
			
		||||
    true.
 | 
			
		||||
is_replicated(Q) ->
 | 
			
		||||
    TypeModule = ?amqqueue_type(Q),
 | 
			
		||||
    TypeModule:is_replicated().
 | 
			
		||||
 | 
			
		||||
is_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) ->
 | 
			
		||||
    false;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -4,6 +4,8 @@
 | 
			
		|||
%%
 | 
			
		||||
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
 | 
			
		||||
%%
 | 
			
		||||
%% README: https://github.com/rabbitmq/internals/blob/master/rabbit_boot_process.md
 | 
			
		||||
%%
 | 
			
		||||
 | 
			
		||||
-module(rabbit_boot_steps).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1265,11 +1265,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
 | 
			
		|||
            ?INCR_STATS(queue_stats, QueueName, 1, get_empty, State),
 | 
			
		||||
            {reply, #'basic.get_empty'{}, State#ch{queue_states = QueueStates}};
 | 
			
		||||
        {error, {unsupported, single_active_consumer}} ->
 | 
			
		||||
            rabbit_misc:protocol_error(
 | 
			
		||||
              resource_locked,
 | 
			
		||||
              "cannot obtain access to locked ~ts. basic.get operations "
 | 
			
		||||
              "are not supported by quorum queues with single active consumer",
 | 
			
		||||
              [rabbit_misc:rs(QueueName)]);
 | 
			
		||||
             rabbit_amqqueue:with_or_die(QueueName, fun unsupported_single_active_consumer_error/1);
 | 
			
		||||
        {error, Reason} ->
 | 
			
		||||
            %% TODO add queue type to error message
 | 
			
		||||
            rabbit_misc:protocol_error(internal_error,
 | 
			
		||||
| 
						 | 
				
			
			@ -2005,6 +2001,7 @@ foreach_per_queue(_F, [], Acc) ->
 | 
			
		|||
foreach_per_queue(F, [#pending_ack{tag = CTag,
 | 
			
		||||
                                   queue = QName,
 | 
			
		||||
                                   msg_id = MsgId}], Acc) ->
 | 
			
		||||
    %% TODO: fix this abstraction leak
 | 
			
		||||
    %% quorum queue, needs the consumer tag
 | 
			
		||||
    F({QName, CTag}, [MsgId], Acc);
 | 
			
		||||
foreach_per_queue(F, UAL, Acc) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -2032,6 +2029,7 @@ notify_limiter(Limiter, Acked) ->
 | 
			
		|||
     case rabbit_limiter:is_active(Limiter) of
 | 
			
		||||
        false -> ok;
 | 
			
		||||
        true  -> case lists:foldl(fun (#pending_ack{tag = CTag}, Acc) when is_integer(CTag) ->
 | 
			
		||||
                                          %% TODO: fix absctraction leak
 | 
			
		||||
                                          %% Quorum queues use integer CTags
 | 
			
		||||
                                          %% classic queues use binaries
 | 
			
		||||
                                          %% Quorum queues do not interact
 | 
			
		||||
| 
						 | 
				
			
			@ -2792,3 +2790,12 @@ maybe_decrease_global_publishers(#ch{publishing_mode = true}) ->
 | 
			
		|||
 | 
			
		||||
is_global_qos_permitted() ->
 | 
			
		||||
    rabbit_deprecated_features:is_permitted(global_qos).
 | 
			
		||||
 | 
			
		||||
-spec unsupported_single_active_consumer_error(amqqueue:amqqueue()) -> no_return().
 | 
			
		||||
unsupported_single_active_consumer_error(Q) ->
 | 
			
		||||
    rabbit_misc:protocol_error(
 | 
			
		||||
      resource_locked,
 | 
			
		||||
      "cannot obtain access to locked ~ts. basic.get operations "
 | 
			
		||||
      "are not supported by ~p queues with single active consumer",
 | 
			
		||||
      [rabbit_misc:rs(amqqueue:get_name(Q)),
 | 
			
		||||
       rabbit_queue_type:short_alias_of(amqqueue:get_type(Q))]).
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -64,8 +64,32 @@
 | 
			
		|||
         send_drained_credit_api_v1/4,
 | 
			
		||||
         send_credit_reply/7]).
 | 
			
		||||
 | 
			
		||||
-export([queue_topology/1,
 | 
			
		||||
         feature_flag_name/0,
 | 
			
		||||
         policy_apply_to_name/0,
 | 
			
		||||
         can_redeliver/0,
 | 
			
		||||
         stop/1,
 | 
			
		||||
         is_replicated/0,
 | 
			
		||||
         rebalance_module/0,
 | 
			
		||||
         list_with_minimum_quorum/0,
 | 
			
		||||
         drain/1,
 | 
			
		||||
         revive/0,
 | 
			
		||||
         queue_vm_stats_sups/0,
 | 
			
		||||
         queue_vm_ets/0,
 | 
			
		||||
         dir_base/0]).
 | 
			
		||||
 | 
			
		||||
-export([validate_policy/1]).
 | 
			
		||||
 | 
			
		||||
-rabbit_boot_step(
 | 
			
		||||
   {rabbit_classic_queue_type,
 | 
			
		||||
    [{description, "Classic queue: queue type"},
 | 
			
		||||
     {mfa,      {rabbit_registry, register,
 | 
			
		||||
                    [queue, <<"classic">>, ?MODULE]}},
 | 
			
		||||
     {cleanup,  {rabbit_registry, unregister,
 | 
			
		||||
                 [queue, <<"classic">>]}},
 | 
			
		||||
     {requires, rabbit_registry},
 | 
			
		||||
     {enables,     ?MODULE}]}).
 | 
			
		||||
 | 
			
		||||
-rabbit_boot_step(
 | 
			
		||||
   {?MODULE,
 | 
			
		||||
    [{description, "Deprecated queue-master-locator support."
 | 
			
		||||
| 
						 | 
				
			
			@ -74,7 +98,7 @@
 | 
			
		|||
            [policy_validator, <<"queue-master-locator">>, ?MODULE]}},
 | 
			
		||||
     {mfa, {rabbit_registry, register,
 | 
			
		||||
            [operator_policy_validator, <<"queue-master-locator">>, ?MODULE]}},
 | 
			
		||||
     {requires, rabbit_registry},
 | 
			
		||||
     {requires, [rabbit_classic_queue_type]},
 | 
			
		||||
     {enables, recovery}]}).
 | 
			
		||||
 | 
			
		||||
validate_policy(Args) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -674,3 +698,56 @@ send_credit_reply(Pid, QName, Ctag, DeliveryCount, Credit, Available, Drain) ->
 | 
			
		|||
 | 
			
		||||
send_queue_event(Pid, QName, Event) ->
 | 
			
		||||
    gen_server:cast(Pid, {queue_event, QName, Event}).
 | 
			
		||||
 | 
			
		||||
-spec queue_topology(amqqueue:amqqueue()) ->
 | 
			
		||||
    {Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
 | 
			
		||||
queue_topology(Q) ->
 | 
			
		||||
    Pid = amqqueue:get_pid(Q),
 | 
			
		||||
    Node = node(Pid),
 | 
			
		||||
    {Node, [Node]}.
 | 
			
		||||
 | 
			
		||||
feature_flag_name() ->
 | 
			
		||||
    undefined.
 | 
			
		||||
 | 
			
		||||
policy_apply_to_name() ->
 | 
			
		||||
    <<"classic_queues">>.
 | 
			
		||||
 | 
			
		||||
can_redeliver() ->
 | 
			
		||||
    true.
 | 
			
		||||
 | 
			
		||||
stop(VHost) ->
 | 
			
		||||
    ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
 | 
			
		||||
    {ok, BQ} = application:get_env(rabbit, backing_queue_module),
 | 
			
		||||
    ok = BQ:stop(VHost).
 | 
			
		||||
 | 
			
		||||
is_replicated() ->
 | 
			
		||||
    false.
 | 
			
		||||
 | 
			
		||||
rebalance_module() ->
 | 
			
		||||
    {error, not_supported}.
 | 
			
		||||
 | 
			
		||||
list_with_minimum_quorum() ->
 | 
			
		||||
    [].
 | 
			
		||||
 | 
			
		||||
drain(_TransferCandidates) ->
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
revive() ->
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
queue_vm_stats_sups() ->
 | 
			
		||||
    {[queue_procs], [rabbit_vm:all_vhosts_children(rabbit_amqqueue_sup_sup)]}.
 | 
			
		||||
 | 
			
		||||
%% return nothing because of this line in rabbit_vm:
 | 
			
		||||
%% {msg_index,            MsgIndexETS + MsgIndexProc},
 | 
			
		||||
%% it mixes procs and ets,
 | 
			
		||||
%% TODO: maybe instead of separating sups and ets
 | 
			
		||||
%% I need vm_memory callback that just
 | 
			
		||||
%% returns proplist? And rabbit_vm calculates
 | 
			
		||||
%% Other as usual by substraction.
 | 
			
		||||
queue_vm_ets() ->
 | 
			
		||||
    {[],
 | 
			
		||||
     []}.
 | 
			
		||||
 | 
			
		||||
dir_base() ->
 | 
			
		||||
    [rabbit_vhost:msg_store_dir_base()].
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -74,22 +74,15 @@ gc_local_queues() ->
 | 
			
		|||
    GbSetDown = gb_sets:from_list(QueuesDown),
 | 
			
		||||
    gc_queue_metrics(GbSet, GbSetDown),
 | 
			
		||||
    gc_entity(queue_coarse_metrics, GbSet),
 | 
			
		||||
    Followers = gb_sets:from_list([amqqueue:get_name(Q) || Q <- rabbit_amqqueue:list_local_followers() ]),
 | 
			
		||||
    gc_leader_data(Followers).
 | 
			
		||||
    %% remove coarse metrics for quorum queues without local leader
 | 
			
		||||
    gc_leader_data().
 | 
			
		||||
 | 
			
		||||
gc_leader_data(Followers) ->
 | 
			
		||||
    ets:foldl(fun({Id, _, _, _, _}, none) ->
 | 
			
		||||
                      gc_leader_data(Id, queue_coarse_metrics, Followers)
 | 
			
		||||
              end, none, queue_coarse_metrics).
 | 
			
		||||
 | 
			
		||||
gc_leader_data(Id, Table, GbSet) ->
 | 
			
		||||
    case gb_sets:is_member(Id, GbSet) of
 | 
			
		||||
        true ->
 | 
			
		||||
            ets:delete(Table, Id),
 | 
			
		||||
            none;
 | 
			
		||||
        false ->
 | 
			
		||||
            none
 | 
			
		||||
    end.
 | 
			
		||||
gc_leader_data() ->
 | 
			
		||||
    _ = [begin
 | 
			
		||||
             QName = amqqueue:get_name(Q),
 | 
			
		||||
             rabbit_core_metrics:delete_queue_coarse_metrics(QName)
 | 
			
		||||
         end || Q <- rabbit_amqqueue:list_local_followers()],
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
gc_global_queues() ->
 | 
			
		||||
    GbSet = gb_sets:from_list(rabbit_amqqueue:list_names()),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1045,16 +1045,11 @@ list_queues() ->
 | 
			
		|||
 | 
			
		||||
queue_definition(Q) ->
 | 
			
		||||
    #resource{virtual_host = VHost, name = Name} = amqqueue:get_name(Q),
 | 
			
		||||
    Type = case amqqueue:get_type(Q) of
 | 
			
		||||
               rabbit_classic_queue -> classic;
 | 
			
		||||
               rabbit_quorum_queue -> quorum;
 | 
			
		||||
               rabbit_stream_queue -> stream;
 | 
			
		||||
               T -> T
 | 
			
		||||
           end,
 | 
			
		||||
    TypeModule =  amqqueue:get_type(Q),
 | 
			
		||||
    #{
 | 
			
		||||
        <<"vhost">> => VHost,
 | 
			
		||||
        <<"name">> => Name,
 | 
			
		||||
        <<"type">> => Type,
 | 
			
		||||
        <<"type">> => rabbit_registry:lookup_type_name(queue, TypeModule),
 | 
			
		||||
        <<"durable">> => amqqueue:is_durable(Q),
 | 
			
		||||
        <<"auto_delete">> => amqqueue:is_auto_delete(Q),
 | 
			
		||||
        <<"arguments">> => rabbit_misc:amqp_table(amqqueue:get_arguments(Q))
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -538,14 +538,8 @@ redeliver0(#pending{delivery = Msg0,
 | 
			
		|||
clients_redeliver(Qs, QTypeState) ->
 | 
			
		||||
    lists:filter(fun(Q) ->
 | 
			
		||||
                         case rabbit_queue_type:module(Q, QTypeState) of
 | 
			
		||||
                             {ok, rabbit_quorum_queue} ->
 | 
			
		||||
                                 % If #enqueue{} Raft command does not get applied
 | 
			
		||||
                                 % rabbit_fifo_client will resend.
 | 
			
		||||
                                 true;
 | 
			
		||||
                             {ok, rabbit_stream_queue} ->
 | 
			
		||||
                                 true;
 | 
			
		||||
                             _ ->
 | 
			
		||||
                                 false
 | 
			
		||||
                             {ok, TypeModule} -> TypeModule:can_redeliver();
 | 
			
		||||
                             _ -> false
 | 
			
		||||
                         end
 | 
			
		||||
                 end, Qs).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -266,8 +266,8 @@ messages_dead_lettered(Reason, QueueType, DeadLetterStrategy, Num) ->
 | 
			
		|||
            end,
 | 
			
		||||
    counters:add(fetch(QueueType, DeadLetterStrategy), Index, Num).
 | 
			
		||||
 | 
			
		||||
messages_dead_lettered_confirmed(rabbit_quorum_queue, at_least_once, Num) ->
 | 
			
		||||
    counters:add(fetch(rabbit_quorum_queue, at_least_once), ?MESSAGES_DEAD_LETTERED_CONFIRMED, Num).
 | 
			
		||||
messages_dead_lettered_confirmed(QTypeModule, at_least_once, Num) ->
 | 
			
		||||
    counters:add(fetch(QTypeModule, at_least_once), ?MESSAGES_DEAD_LETTERED_CONFIRMED, Num).
 | 
			
		||||
 | 
			
		||||
fetch(Protocol) ->
 | 
			
		||||
    persistent_term:get({?MODULE, Protocol}).
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -33,7 +33,6 @@
 | 
			
		|||
    close_all_client_connections/0,
 | 
			
		||||
    primary_replica_transfer_candidate_nodes/0,
 | 
			
		||||
    random_primary_replica_transfer_candidate_node/2,
 | 
			
		||||
    transfer_leadership_of_quorum_queues/1,
 | 
			
		||||
    table_definitions/0
 | 
			
		||||
]).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -78,13 +77,7 @@ drain() ->
 | 
			
		|||
    TransferCandidates = primary_replica_transfer_candidate_nodes(),
 | 
			
		||||
    %% Note: only QQ leadership is transferred because it is a reasonably quick thing to do a lot of queues
 | 
			
		||||
    %% in the cluster, unlike with CMQs.
 | 
			
		||||
    transfer_leadership_of_quorum_queues(TransferCandidates),
 | 
			
		||||
    stop_local_quorum_queue_followers(),
 | 
			
		||||
 | 
			
		||||
    case whereis(rabbit_stream_coordinator) of
 | 
			
		||||
        undefined -> ok;
 | 
			
		||||
        _Pid -> transfer_leadership_of_stream_coordinator(TransferCandidates)
 | 
			
		||||
    end,
 | 
			
		||||
    rabbit_queue_type:drain(TransferCandidates),
 | 
			
		||||
 | 
			
		||||
    transfer_leadership_of_metadata_store(TransferCandidates),
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -99,7 +92,7 @@ drain() ->
 | 
			
		|||
-spec revive() -> ok.
 | 
			
		||||
revive() ->
 | 
			
		||||
    rabbit_log:info("This node is being revived from maintenance (drain) mode"),
 | 
			
		||||
    revive_local_quorum_queue_replicas(),
 | 
			
		||||
    rabbit_queue_type:revive(),
 | 
			
		||||
    rabbit_log:info("Resumed all listeners and will accept client connections again"),
 | 
			
		||||
    _ = resume_all_client_listeners(),
 | 
			
		||||
    rabbit_log:info("Resumed all listeners and will accept client connections again"),
 | 
			
		||||
| 
						 | 
				
			
			@ -186,32 +179,6 @@ close_all_client_connections() ->
 | 
			
		|||
    rabbit_networking:close_connections(Pids, "Node was put into maintenance mode"),
 | 
			
		||||
    {ok, length(Pids)}.
 | 
			
		||||
 | 
			
		||||
-spec transfer_leadership_of_quorum_queues([node()]) -> ok.
 | 
			
		||||
transfer_leadership_of_quorum_queues([]) ->
 | 
			
		||||
    rabbit_log:warning("Skipping leadership transfer of quorum queues: no candidate "
 | 
			
		||||
                       "(online, not under maintenance) nodes to transfer to!");
 | 
			
		||||
transfer_leadership_of_quorum_queues(_TransferCandidates) ->
 | 
			
		||||
    %% we only transfer leadership for QQs that have local leaders
 | 
			
		||||
    Queues = rabbit_amqqueue:list_local_leaders(),
 | 
			
		||||
    rabbit_log:info("Will transfer leadership of ~b quorum queues with current leader on this node",
 | 
			
		||||
                    [length(Queues)]),
 | 
			
		||||
    [begin
 | 
			
		||||
        Name = amqqueue:get_name(Q),
 | 
			
		||||
        rabbit_log:debug("Will trigger a leader election for local quorum queue ~ts",
 | 
			
		||||
                         [rabbit_misc:rs(Name)]),
 | 
			
		||||
        %% we trigger an election and exclude this node from the list of candidates
 | 
			
		||||
        %% by simply shutting its local QQ replica (Ra server)
 | 
			
		||||
        RaLeader = amqqueue:get_pid(Q),
 | 
			
		||||
        rabbit_log:debug("Will stop Ra server ~tp", [RaLeader]),
 | 
			
		||||
        case rabbit_quorum_queue:stop_server(RaLeader) of
 | 
			
		||||
            ok     ->
 | 
			
		||||
                rabbit_log:debug("Successfully stopped Ra server ~tp", [RaLeader]);
 | 
			
		||||
            {error, nodedown} ->
 | 
			
		||||
                rabbit_log:error("Failed to stop Ra server ~tp: target node was reported as down")
 | 
			
		||||
        end
 | 
			
		||||
     end || Q <- Queues],
 | 
			
		||||
    rabbit_log:info("Leadership transfer for quorum queues hosted on this node has been initiated").
 | 
			
		||||
 | 
			
		||||
transfer_leadership_of_metadata_store(TransferCandidates) ->
 | 
			
		||||
    rabbit_log:info("Will transfer leadership of metadata store with current leader on this node",
 | 
			
		||||
                    []),
 | 
			
		||||
| 
						 | 
				
			
			@ -224,47 +191,6 @@ transfer_leadership_of_metadata_store(TransferCandidates) ->
 | 
			
		|||
            rabbit_log:warning("Skipping leadership transfer of metadata store: ~p", [Error])
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
-spec transfer_leadership_of_stream_coordinator([node()]) -> ok.
 | 
			
		||||
transfer_leadership_of_stream_coordinator([]) ->
 | 
			
		||||
    rabbit_log:warning("Skipping leadership transfer of stream coordinator: no candidate "
 | 
			
		||||
                       "(online, not under maintenance) nodes to transfer to!");
 | 
			
		||||
transfer_leadership_of_stream_coordinator(TransferCandidates) ->
 | 
			
		||||
    % try to transfer to the node with the lowest uptime; the assumption is that
 | 
			
		||||
    % nodes are usually restarted in a rolling fashion, in a consistent order;
 | 
			
		||||
    % therefore, the youngest node has already been restarted  or (if we are draining the first node)
 | 
			
		||||
    % that it will be restarted last. either way, this way we limit the number of transfers
 | 
			
		||||
    Uptimes = rabbit_misc:append_rpc_all_nodes(TransferCandidates, erlang, statistics, [wall_clock]),
 | 
			
		||||
    Candidates = lists:zipwith(fun(N, {U, _}) -> {N, U}  end, TransferCandidates, Uptimes),
 | 
			
		||||
    BestCandidate = element(1, hd(lists:keysort(2, Candidates))),
 | 
			
		||||
    case rabbit_stream_coordinator:transfer_leadership([BestCandidate]) of
 | 
			
		||||
        {ok, Node} ->
 | 
			
		||||
            rabbit_log:info("Leadership transfer for stream coordinator completed. The new leader is ~p", [Node]);
 | 
			
		||||
        Error ->
 | 
			
		||||
            rabbit_log:warning("Skipping leadership transfer of stream coordinator: ~p", [Error])
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
-spec stop_local_quorum_queue_followers() -> ok.
 | 
			
		||||
stop_local_quorum_queue_followers() ->
 | 
			
		||||
    Queues = rabbit_amqqueue:list_local_followers(),
 | 
			
		||||
    rabbit_log:info("Will stop local follower replicas of ~b quorum queues on this node",
 | 
			
		||||
                    [length(Queues)]),
 | 
			
		||||
    [begin
 | 
			
		||||
        Name = amqqueue:get_name(Q),
 | 
			
		||||
        rabbit_log:debug("Will stop a local follower replica of quorum queue ~ts",
 | 
			
		||||
                         [rabbit_misc:rs(Name)]),
 | 
			
		||||
        %% shut down Ra nodes so that they are not considered for leader election
 | 
			
		||||
        {RegisteredName, _LeaderNode} = amqqueue:get_pid(Q),
 | 
			
		||||
        RaNode = {RegisteredName, node()},
 | 
			
		||||
        rabbit_log:debug("Will stop Ra server ~tp", [RaNode]),
 | 
			
		||||
        case rabbit_quorum_queue:stop_server(RaNode) of
 | 
			
		||||
            ok     ->
 | 
			
		||||
                rabbit_log:debug("Successfully stopped Ra server ~tp", [RaNode]);
 | 
			
		||||
            {error, nodedown} ->
 | 
			
		||||
                rabbit_log:error("Failed to stop Ra server ~tp: target node was reported as down")
 | 
			
		||||
        end
 | 
			
		||||
     end || Q <- Queues],
 | 
			
		||||
    rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node").
 | 
			
		||||
 | 
			
		||||
-spec primary_replica_transfer_candidate_nodes() -> [node()].
 | 
			
		||||
primary_replica_transfer_candidate_nodes() ->
 | 
			
		||||
    filter_out_drained_nodes_consistent_read(rabbit_nodes:list_running() -- [node()]).
 | 
			
		||||
| 
						 | 
				
			
			@ -289,24 +215,6 @@ random_nth(Nodes) ->
 | 
			
		|||
    Nth = erlang:phash2(erlang:monotonic_time(), length(Nodes)),
 | 
			
		||||
    lists:nth(Nth + 1, Nodes).
 | 
			
		||||
 | 
			
		||||
revive_local_quorum_queue_replicas() ->
 | 
			
		||||
    Queues = rabbit_amqqueue:list_local_followers(),
 | 
			
		||||
    %% NB: this function ignores the first argument so we can just pass the
 | 
			
		||||
    %% empty binary as the vhost name.
 | 
			
		||||
    {Recovered, Failed} = rabbit_quorum_queue:recover(<<>>, Queues),
 | 
			
		||||
    rabbit_log:debug("Successfully revived ~b quorum queue replicas",
 | 
			
		||||
                     [length(Recovered)]),
 | 
			
		||||
    case length(Failed) of
 | 
			
		||||
        0 ->
 | 
			
		||||
            ok;
 | 
			
		||||
        NumFailed ->
 | 
			
		||||
            rabbit_log:error("Failed to revive ~b quorum queue replicas",
 | 
			
		||||
                             [NumFailed])
 | 
			
		||||
    end,
 | 
			
		||||
 | 
			
		||||
    rabbit_log:info("Restart of local quorum queue replicas is complete"),
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
%%
 | 
			
		||||
%% Implementation
 | 
			
		||||
%%
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -7,10 +7,21 @@
 | 
			
		|||
 | 
			
		||||
-module(rabbit_observer_cli).
 | 
			
		||||
 | 
			
		||||
-export([init/0]).
 | 
			
		||||
-export([init/0, add_plugin/1]).
 | 
			
		||||
 | 
			
		||||
init() ->
 | 
			
		||||
    application:set_env(observer_cli, plugins, [
 | 
			
		||||
        rabbit_observer_cli_classic_queues:plugin_info(),
 | 
			
		||||
        rabbit_observer_cli_quorum_queues:plugin_info()
 | 
			
		||||
    ]).
 | 
			
		||||
 | 
			
		||||
%% must be executed after observer_cli boot_step
 | 
			
		||||
add_plugin(PluginInfo) ->
 | 
			
		||||
    case application:get_env(observer_cli, plugins, undefined) of
 | 
			
		||||
        undefined -> %% shouldn't be there, die
 | 
			
		||||
            exit({rabbit_observer_cli_step_not_there, "Can't add observer_cli plugin, required boot_step wasn't executed"});
 | 
			
		||||
        Plugins when is_list(Plugins) ->
 | 
			
		||||
            application:set_env(observer_cli, plugins, Plugins ++ [PluginInfo]);
 | 
			
		||||
        _ ->
 | 
			
		||||
            exit({rabbit_observer_cli_plugins_error, "Can't add observer_cli plugin, existing entry is not a list"})
 | 
			
		||||
    end.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -493,10 +493,13 @@ matches_type(_,        _)               -> false.
 | 
			
		|||
 | 
			
		||||
matches_queue_type(queue, _, <<"all">>)    -> true;
 | 
			
		||||
matches_queue_type(queue, _, <<"queues">>) -> true;
 | 
			
		||||
matches_queue_type(queue, rabbit_classic_queue, <<"classic_queues">>) -> true;
 | 
			
		||||
matches_queue_type(queue, rabbit_quorum_queue,  <<"quorum_queues">>)  -> true;
 | 
			
		||||
matches_queue_type(queue, rabbit_stream_queue,  <<"streams">>)        -> true;
 | 
			
		||||
matches_queue_type(queue, _, _) -> false.
 | 
			
		||||
matches_queue_type(queue, TypeModule, Term) ->
 | 
			
		||||
    %% we assume here TypeModule comes from queue struct,
 | 
			
		||||
    %% therefore it is used and loaded - no need to check
 | 
			
		||||
    %% with registry.
 | 
			
		||||
    %% we also assume here and elsewhere that queue type
 | 
			
		||||
    %% module developer implemented all needed callbacks
 | 
			
		||||
    TypeModule:policy_apply_to_name() == Term.
 | 
			
		||||
 | 
			
		||||
priority_comparator(A, B) -> pget(priority, A) >= pget(priority, B).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -578,9 +581,20 @@ is_proplist(L) -> length(L) =:= length([I || I = {_, _} <- L]).
 | 
			
		|||
apply_to_validation(_Name, <<"all">>)       -> ok;
 | 
			
		||||
apply_to_validation(_Name, <<"exchanges">>) -> ok;
 | 
			
		||||
apply_to_validation(_Name, <<"queues">>)    -> ok;
 | 
			
		||||
apply_to_validation(_Name, <<"classic_queues">>)    -> ok;
 | 
			
		||||
apply_to_validation(_Name, <<"quorum_queues">>)    -> ok;
 | 
			
		||||
apply_to_validation(_Name, <<"streams">>)    -> ok;
 | 
			
		||||
apply_to_validation(_Name, Term) ->
 | 
			
		||||
    {error, "apply-to '~ts' unrecognised; should be one of: 'queues', 'classic_queues', "
 | 
			
		||||
     " 'quorum_queues', 'streams', 'exchanges', or 'all'", [Term]}.
 | 
			
		||||
    %% as a last restort go to queue types registry
 | 
			
		||||
    %% and try to find something here
 | 
			
		||||
    case maybe_apply_to_queue_type(Term) of
 | 
			
		||||
        true -> ok;
 | 
			
		||||
        false ->
 | 
			
		||||
            %% TODO: get recognized queue terms from queue types from queue type.
 | 
			
		||||
            {error, "apply-to '~ts' unrecognised; should be one of: 'queues', 'classic_queues', "
 | 
			
		||||
             " 'quorum_queues', 'streams', 'exchanges', or 'all'", [Term]}
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
maybe_apply_to_queue_type(Term) ->
 | 
			
		||||
    [] =/= lists:filter(fun({_TypeName, TypeModule}) ->
 | 
			
		||||
                               TypeModule:policy_apply_to_name() == Term
 | 
			
		||||
                       end,
 | 
			
		||||
                       rabbit_registry:lookup_all(queue)).
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -45,7 +45,7 @@ queue_leader_locators() ->
 | 
			
		|||
-spec select_leader_and_followers(amqqueue:amqqueue(), pos_integer()) ->
 | 
			
		||||
    {Leader :: node(), Followers :: [node()]}.
 | 
			
		||||
select_leader_and_followers(Q, Size)
 | 
			
		||||
  when (?amqqueue_is_quorum(Q) orelse ?amqqueue_is_stream(Q) orelse ?amqqueue_is_classic(Q)) andalso is_integer(Size) ->
 | 
			
		||||
  when (?is_amqqueue_v2(Q)) andalso is_integer(Size) ->
 | 
			
		||||
    LeaderLocator = leader_locator(Q),
 | 
			
		||||
    QueueType = amqqueue:get_type(Q),
 | 
			
		||||
    do_select_leader_and_followers(Size, QueueType, LeaderLocator).
 | 
			
		||||
| 
						 | 
				
			
			@ -109,6 +109,7 @@ leader_locator0(_) ->
 | 
			
		|||
    %% default
 | 
			
		||||
    <<"client-local">>.
 | 
			
		||||
 | 
			
		||||
%% TODO: allow dispatching by queue type
 | 
			
		||||
-spec select_members(pos_integer(), rabbit_queue_type:queue_type(), [node(),...], [node(),...],
 | 
			
		||||
                      non_neg_integer(), non_neg_integer(), function()) ->
 | 
			
		||||
    {[node(),...], function()}.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -62,7 +62,14 @@
 | 
			
		|||
         arguments/1,
 | 
			
		||||
         arguments/2,
 | 
			
		||||
         notify_decorators/1,
 | 
			
		||||
         publish_at_most_once/2
 | 
			
		||||
         publish_at_most_once/2,
 | 
			
		||||
         can_redeliver/2,
 | 
			
		||||
         stop/1,
 | 
			
		||||
         endangered_queues/0,
 | 
			
		||||
         drain/1,
 | 
			
		||||
         revive/0,
 | 
			
		||||
         queue_vm_stats_sups/0,
 | 
			
		||||
         queue_vm_ets/0
 | 
			
		||||
         ]).
 | 
			
		||||
 | 
			
		||||
-export([
 | 
			
		||||
| 
						 | 
				
			
			@ -77,7 +84,7 @@
 | 
			
		|||
%% sequence number typically
 | 
			
		||||
-type correlation() :: term().
 | 
			
		||||
-type arguments() :: queue_arguments | consumer_arguments.
 | 
			
		||||
-type queue_type() :: rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue | module().
 | 
			
		||||
-type queue_type() ::  module().
 | 
			
		||||
%% see AMQP 1.0 §2.6.7
 | 
			
		||||
-type delivery_count() :: sequence_no().
 | 
			
		||||
-type credit() :: uint().
 | 
			
		||||
| 
						 | 
				
			
			@ -86,10 +93,6 @@
 | 
			
		|||
 | 
			
		||||
-define(DOWN_KEYS, [name, durable, auto_delete, arguments, pid, type, state]).
 | 
			
		||||
 | 
			
		||||
%% TODO resolve all registered queue types from registry
 | 
			
		||||
-define(QUEUE_MODULES, [rabbit_classic_queue, rabbit_quorum_queue, rabbit_stream_queue]).
 | 
			
		||||
-define(KNOWN_QUEUE_TYPES, [<<"classic">>, <<"quorum">>, <<"stream">>]).
 | 
			
		||||
 | 
			
		||||
-type credit_reply_action() :: {credit_reply, rabbit_types:ctag(), delivery_count(), credit(),
 | 
			
		||||
                                Available :: non_neg_integer(), Drain :: boolean()}.
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -274,75 +277,59 @@
 | 
			
		|||
-callback notify_decorators(amqqueue:amqqueue()) ->
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
-callback queue_topology(amqqueue:amqqueue()) ->
 | 
			
		||||
    {Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
 | 
			
		||||
 | 
			
		||||
-callback feature_flag_name() -> atom().
 | 
			
		||||
 | 
			
		||||
-callback policy_apply_to_name() -> binary().
 | 
			
		||||
 | 
			
		||||
%% -callback on_node_up(node()) -> ok.
 | 
			
		||||
 | 
			
		||||
%% -callback on_node_down(node()) -> ok.
 | 
			
		||||
 | 
			
		||||
-callback can_redeliver() -> boolean().
 | 
			
		||||
 | 
			
		||||
-callback stop(rabbit_types:vhost()) -> ok.
 | 
			
		||||
 | 
			
		||||
-callback is_replicated() -> boolean().
 | 
			
		||||
 | 
			
		||||
-callback rebalance_module() -> module() | {error, not_supported}.
 | 
			
		||||
 | 
			
		||||
-callback list_with_minimum_quorum() -> [amqqueue:amqqueue()].
 | 
			
		||||
 | 
			
		||||
-callback drain([node()]) -> ok.
 | 
			
		||||
 | 
			
		||||
-callback revive() -> ok.
 | 
			
		||||
 | 
			
		||||
%% used by rabbit_vm to emit queue process
 | 
			
		||||
%% (currently memory and binary) stats
 | 
			
		||||
-callback queue_vm_stats_sups() -> {StatsKeys :: [atom()], SupsNames:: [[atom()]]}.
 | 
			
		||||
 | 
			
		||||
-spec discover(binary() | atom()) -> queue_type().
 | 
			
		||||
discover(<<"undefined">>) ->
 | 
			
		||||
    fallback();
 | 
			
		||||
discover(undefined) ->
 | 
			
		||||
    fallback();
 | 
			
		||||
%% TODO: should this use a registry that's populated on boot?
 | 
			
		||||
discover(<<"quorum">>) ->
 | 
			
		||||
    rabbit_quorum_queue;
 | 
			
		||||
discover(rabbit_quorum_queue) ->
 | 
			
		||||
    rabbit_quorum_queue;
 | 
			
		||||
discover(<<"classic">>) ->
 | 
			
		||||
    rabbit_classic_queue;
 | 
			
		||||
discover(rabbit_classic_queue) ->
 | 
			
		||||
    rabbit_classic_queue;
 | 
			
		||||
discover(rabbit_stream_queue) ->
 | 
			
		||||
    rabbit_stream_queue;
 | 
			
		||||
discover(<<"stream">>) ->
 | 
			
		||||
    rabbit_stream_queue;
 | 
			
		||||
discover(Other) when is_atom(Other) ->
 | 
			
		||||
    discover(rabbit_data_coercion:to_binary(Other));
 | 
			
		||||
discover(Other) when is_binary(Other) ->
 | 
			
		||||
    T = rabbit_registry:binary_to_type(Other),
 | 
			
		||||
    rabbit_log:debug("Queue type discovery: will look up a module for type '~tp'", [T]),
 | 
			
		||||
    {ok, Mod} = rabbit_registry:lookup_module(queue, T),
 | 
			
		||||
    Mod.
 | 
			
		||||
discover(TypeDescriptor) ->
 | 
			
		||||
    {ok, TypeModule} = rabbit_registry:lookup_type_module(queue, TypeDescriptor),
 | 
			
		||||
    TypeModule.
 | 
			
		||||
 | 
			
		||||
-spec short_alias_of(queue_type()) -> binary().
 | 
			
		||||
%% The opposite of discover/1: returns a short alias given a module name
 | 
			
		||||
short_alias_of(<<"rabbit_quorum_queue">>) ->
 | 
			
		||||
    <<"quorum">>;
 | 
			
		||||
short_alias_of(rabbit_quorum_queue) ->
 | 
			
		||||
    <<"quorum">>;
 | 
			
		||||
%% AMQP 1.0 management client
 | 
			
		||||
short_alias_of({utf8, <<"quorum">>}) ->
 | 
			
		||||
    <<"quorum">>;
 | 
			
		||||
short_alias_of(<<"rabbit_classic_queue">>) ->
 | 
			
		||||
    <<"classic">>;
 | 
			
		||||
short_alias_of(rabbit_classic_queue) ->
 | 
			
		||||
    <<"classic">>;
 | 
			
		||||
%% AMQP 1.0 management client
 | 
			
		||||
short_alias_of({utf8, <<"classic">>}) ->
 | 
			
		||||
    <<"classic">>;
 | 
			
		||||
short_alias_of(<<"rabbit_stream_queue">>) ->
 | 
			
		||||
    <<"stream">>;
 | 
			
		||||
short_alias_of(rabbit_stream_queue) ->
 | 
			
		||||
    <<"stream">>;
 | 
			
		||||
%% AMQP 1.0 management client
 | 
			
		||||
short_alias_of({utf8, <<"stream">>}) ->
 | 
			
		||||
    <<"stream">>;
 | 
			
		||||
%% for cases where this function is used for
 | 
			
		||||
%% formatting of values that already might use these
 | 
			
		||||
%% short aliases
 | 
			
		||||
short_alias_of(<<"quorum">>) ->
 | 
			
		||||
    <<"quorum">>;
 | 
			
		||||
short_alias_of(<<"classic">>) ->
 | 
			
		||||
    <<"classic">>;
 | 
			
		||||
short_alias_of(<<"stream">>) ->
 | 
			
		||||
    <<"stream">>;
 | 
			
		||||
short_alias_of(_Other) ->
 | 
			
		||||
    undefined.
 | 
			
		||||
-spec short_alias_of(TypeDescriptor) -> Ret when
 | 
			
		||||
      TypeDescriptor :: atom() | binary(),
 | 
			
		||||
      Ret :: binary().
 | 
			
		||||
short_alias_of(TypeDescriptor) ->
 | 
			
		||||
    case rabbit_registry:lookup_type_name(queue, TypeDescriptor) of
 | 
			
		||||
        {ok, TypeName} -> TypeName;
 | 
			
		||||
        _ -> undefined
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
feature_flag_name(<<"quorum">>) ->
 | 
			
		||||
    quorum_queue;
 | 
			
		||||
feature_flag_name(<<"classic">>) ->
 | 
			
		||||
    undefined;
 | 
			
		||||
feature_flag_name(<<"stream">>) ->
 | 
			
		||||
    stream_queue;
 | 
			
		||||
feature_flag_name(_) ->
 | 
			
		||||
    undefined.
 | 
			
		||||
feature_flag_name(TypeDescriptor) ->
 | 
			
		||||
    case rabbit_registry:lookup_type_module(queue, TypeDescriptor) of
 | 
			
		||||
        {ok, TypeModule} ->
 | 
			
		||||
            TypeModule:feature_flag_name();
 | 
			
		||||
        _ -> undefined
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
%% If the client does not specify the type, the virtual host does not have any
 | 
			
		||||
%% metadata default, and rabbit.default_queue_type is not set in the application env,
 | 
			
		||||
| 
						 | 
				
			
			@ -362,15 +349,15 @@ default() ->
 | 
			
		|||
default_alias() ->
 | 
			
		||||
    short_alias_of(default()).
 | 
			
		||||
 | 
			
		||||
%% used for example like this
 | 
			
		||||
%% {{utf8, <<"type">>}, {utf8, rabbit_queue_type:to_binary(QType)}},
 | 
			
		||||
%% so not just any binary but a type name
 | 
			
		||||
-spec to_binary(module()) -> binary().
 | 
			
		||||
to_binary(rabbit_classic_queue) ->
 | 
			
		||||
    <<"classic">>;
 | 
			
		||||
to_binary(rabbit_quorum_queue) ->
 | 
			
		||||
    <<"quorum">>;
 | 
			
		||||
to_binary(rabbit_stream_queue) ->
 | 
			
		||||
    <<"stream">>;
 | 
			
		||||
to_binary(Other) ->
 | 
			
		||||
    atom_to_binary(Other).
 | 
			
		||||
to_binary(TypeModule) ->
 | 
			
		||||
    case rabbit_registry:lookup_type_name(queue, TypeModule) of
 | 
			
		||||
        {ok, TypeName} -> TypeName;
 | 
			
		||||
        _ -> undefined
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
%% is a specific queue type implementation enabled
 | 
			
		||||
-spec is_enabled(module()) -> boolean().
 | 
			
		||||
| 
						 | 
				
			
			@ -849,14 +836,13 @@ qref(Q) when ?is_amqqueue(Q) ->
 | 
			
		|||
known_queue_type_modules() ->
 | 
			
		||||
    Registered = rabbit_registry:lookup_all(queue),
 | 
			
		||||
    {_, Modules} = lists:unzip(Registered),
 | 
			
		||||
    ?QUEUE_MODULES ++ Modules.
 | 
			
		||||
    Modules.
 | 
			
		||||
 | 
			
		||||
-spec known_queue_type_names() -> [binary()].
 | 
			
		||||
known_queue_type_names() ->
 | 
			
		||||
    Registered = rabbit_registry:lookup_all(queue),
 | 
			
		||||
    {QueueTypes, _} = lists:unzip(Registered),
 | 
			
		||||
    QTypeBins = lists:map(fun(X) -> atom_to_binary(X) end, QueueTypes),
 | 
			
		||||
    ?KNOWN_QUEUE_TYPES ++ QTypeBins.
 | 
			
		||||
    lists:map(fun(X) -> atom_to_binary(X) end, QueueTypes).
 | 
			
		||||
 | 
			
		||||
inject_dqt(VHost) when ?is_vhost(VHost) ->
 | 
			
		||||
    inject_dqt(vhost:to_map(VHost));
 | 
			
		||||
| 
						 | 
				
			
			@ -920,3 +906,46 @@ check_cluster_queue_limit(Q) ->
 | 
			
		|||
 | 
			
		||||
queue_limit_error(Reason, ReasonArgs) ->
 | 
			
		||||
    {error, queue_limit_exceeded, Reason, ReasonArgs}.
 | 
			
		||||
 | 
			
		||||
-spec can_redeliver(queue_name(), state()) ->
 | 
			
		||||
    {ok, module()} | {error, not_found}.
 | 
			
		||||
can_redeliver(Q, State) ->
 | 
			
		||||
    case module(Q, State) of
 | 
			
		||||
        {ok, TypeModule} ->
 | 
			
		||||
            TypeModule:can_redeliver();
 | 
			
		||||
        _ -> false
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
-spec stop(rabbit_types:vhost()) -> ok.
 | 
			
		||||
stop(VHost) ->
 | 
			
		||||
    %% original rabbit_amqqueue:stop doesn't do any catches or try after
 | 
			
		||||
    _ = [TypeModule:stop(VHost) || {_Type, TypeModule} <- rabbit_registry:lookup_all(queue)],
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
endangered_queues() ->
 | 
			
		||||
    lists:append([TypeModule:list_with_minimum_quorum()
 | 
			
		||||
                  || {_Type, TypeModule} <- rabbit_registry:lookup_all(queue)]).
 | 
			
		||||
 | 
			
		||||
drain(TransferCandidates) ->
 | 
			
		||||
    _ = [TypeModule:drain(TransferCandidates) ||
 | 
			
		||||
            {_Type, TypeModule} <- rabbit_registry:lookup_all(queue)],
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
revive() ->
 | 
			
		||||
    _ = [TypeModule:revive() ||
 | 
			
		||||
            {_Type, TypeModule} <- rabbit_registry:lookup_all(queue)],
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
queue_vm_stats_sups() ->
 | 
			
		||||
    lists:foldl(fun({_TypeName, TypeModule}, {KeysAcc, SupsAcc}) ->
 | 
			
		||||
                        {Keys, Sups} = TypeModule:queue_vm_stats_sups(),
 | 
			
		||||
                        {KeysAcc ++ Keys, SupsAcc ++ Sups}
 | 
			
		||||
                end,
 | 
			
		||||
                {[], []}, rabbit_registry:lookup_all(queue)).
 | 
			
		||||
 | 
			
		||||
queue_vm_ets() ->
 | 
			
		||||
    lists:foldl(fun({_TypeName, TypeModule}, {KeysAcc, SupsAcc}) ->
 | 
			
		||||
                        {Keys, Tables} = TypeModule:queue_vm_ets(),
 | 
			
		||||
                        {KeysAcc ++ Keys, SupsAcc ++ Tables}
 | 
			
		||||
                end,
 | 
			
		||||
                {[], []}, rabbit_registry:lookup_all(queue)).
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -77,6 +77,17 @@
 | 
			
		|||
         force_vhost_queues_shrink_member_to_current_member/1,
 | 
			
		||||
         force_all_queues_shrink_member_to_current_member/0]).
 | 
			
		||||
 | 
			
		||||
-export([queue_topology/1,
 | 
			
		||||
         feature_flag_name/0,
 | 
			
		||||
         policy_apply_to_name/0,
 | 
			
		||||
         can_redeliver/0,
 | 
			
		||||
         is_replicated/0,
 | 
			
		||||
         rebalance_module/0,
 | 
			
		||||
         drain/1,
 | 
			
		||||
         revive/0,
 | 
			
		||||
         queue_vm_stats_sups/0,
 | 
			
		||||
         queue_vm_ets/0]).
 | 
			
		||||
 | 
			
		||||
%% for backwards compatibility
 | 
			
		||||
-export([file_handle_leader_reservation/1,
 | 
			
		||||
         file_handle_other_reservation/0,
 | 
			
		||||
| 
						 | 
				
			
			@ -95,6 +106,15 @@
 | 
			
		|||
-include_lib("rabbit_common/include/rabbit.hrl").
 | 
			
		||||
-include("amqqueue.hrl").
 | 
			
		||||
 | 
			
		||||
-rabbit_boot_step(
 | 
			
		||||
   {rabbit_quorum_queue_type,
 | 
			
		||||
    [{description, "Quorum queue: queue type"},
 | 
			
		||||
     {mfa,      {rabbit_registry, register,
 | 
			
		||||
                    [queue, <<"quorum">>, ?MODULE]}},
 | 
			
		||||
     {cleanup,  {rabbit_registry, unregister,
 | 
			
		||||
                 [queue, <<"quorum">>]}},
 | 
			
		||||
     {requires, rabbit_registry}]}).
 | 
			
		||||
 | 
			
		||||
-type msg_id() :: non_neg_integer().
 | 
			
		||||
-type qmsg() :: {rabbit_types:r('queue'), pid(), msg_id(), boolean(),
 | 
			
		||||
                 mc:state()}.
 | 
			
		||||
| 
						 | 
				
			
			@ -160,7 +180,7 @@
 | 
			
		|||
            [operator_policy_validator, <<"target-group-size">>, ?MODULE]}},
 | 
			
		||||
     {mfa, {rabbit_registry, register,
 | 
			
		||||
            [policy_merge_strategy, <<"target-group-size">>, ?MODULE]}},
 | 
			
		||||
     {requires, rabbit_registry},
 | 
			
		||||
     {requires, [rabbit_registry]},
 | 
			
		||||
     {enables, recovery}]}).
 | 
			
		||||
 | 
			
		||||
validate_policy(Args) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -2145,3 +2165,113 @@ file_handle_other_reservation() ->
 | 
			
		|||
file_handle_release_reservation() ->
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
-spec queue_topology(amqqueue:amqqueue()) ->
 | 
			
		||||
    {Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
 | 
			
		||||
queue_topology(Q) ->
 | 
			
		||||
    [{leader, Leader0},
 | 
			
		||||
             {members, Members}] = rabbit_queue_type:info(Q, [leader, members]),
 | 
			
		||||
            Leader = case Leader0 of
 | 
			
		||||
                         '' -> undefined;
 | 
			
		||||
                         _ -> Leader0
 | 
			
		||||
                     end,
 | 
			
		||||
            {Leader, Members}.
 | 
			
		||||
 | 
			
		||||
feature_flag_name() ->
 | 
			
		||||
    quorum_queue.
 | 
			
		||||
 | 
			
		||||
policy_apply_to_name() ->
 | 
			
		||||
    <<"quorum_queues">>.
 | 
			
		||||
 | 
			
		||||
can_redeliver() ->
 | 
			
		||||
    true.
 | 
			
		||||
 | 
			
		||||
is_replicated() ->
 | 
			
		||||
    true.
 | 
			
		||||
 | 
			
		||||
rebalance_module() ->
 | 
			
		||||
    ?MODULE.
 | 
			
		||||
 | 
			
		||||
-spec drain([node()]) -> ok.
 | 
			
		||||
drain(TransferCandidates) ->
 | 
			
		||||
    _ = transfer_leadership(TransferCandidates),
 | 
			
		||||
    _ = stop_local_quorum_queue_followers(),
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
transfer_leadership([]) ->
 | 
			
		||||
    rabbit_log:warning("Skipping leadership transfer of quorum queues: no candidate "
 | 
			
		||||
                       "(online, not under maintenance) nodes to transfer to!");
 | 
			
		||||
transfer_leadership(_TransferCandidates) ->
 | 
			
		||||
    %% we only transfer leadership for QQs that have local leaders
 | 
			
		||||
    Queues = rabbit_amqqueue:list_local_leaders(),
 | 
			
		||||
    rabbit_log:info("Will transfer leadership of ~b quorum queues with current leader on this node",
 | 
			
		||||
                    [length(Queues)]),
 | 
			
		||||
    [begin
 | 
			
		||||
        Name = amqqueue:get_name(Q),
 | 
			
		||||
        rabbit_log:debug("Will trigger a leader election for local quorum queue ~ts",
 | 
			
		||||
                         [rabbit_misc:rs(Name)]),
 | 
			
		||||
        %% we trigger an election and exclude this node from the list of candidates
 | 
			
		||||
        %% by simply shutting its local QQ replica (Ra server)
 | 
			
		||||
        RaLeader = amqqueue:get_pid(Q),
 | 
			
		||||
        rabbit_log:debug("Will stop Ra server ~tp", [RaLeader]),
 | 
			
		||||
        case rabbit_quorum_queue:stop_server(RaLeader) of
 | 
			
		||||
            ok     ->
 | 
			
		||||
                rabbit_log:debug("Successfully stopped Ra server ~tp", [RaLeader]);
 | 
			
		||||
            {error, nodedown} ->
 | 
			
		||||
                rabbit_log:error("Failed to stop Ra server ~tp: target node was reported as down")
 | 
			
		||||
        end
 | 
			
		||||
     end || Q <- Queues],
 | 
			
		||||
    rabbit_log:info("Leadership transfer for quorum queues hosted on this node has been initiated").
 | 
			
		||||
 | 
			
		||||
%% TODO: I just copied it over, it looks like was always called inside maintenance so...
 | 
			
		||||
-spec stop_local_quorum_queue_followers() -> ok.
 | 
			
		||||
stop_local_quorum_queue_followers() ->
 | 
			
		||||
    Queues = rabbit_amqqueue:list_local_followers(),
 | 
			
		||||
    rabbit_log:info("Will stop local follower replicas of ~b quorum queues on this node",
 | 
			
		||||
                    [length(Queues)]),
 | 
			
		||||
    [begin
 | 
			
		||||
        Name = amqqueue:get_name(Q),
 | 
			
		||||
        rabbit_log:debug("Will stop a local follower replica of quorum queue ~ts",
 | 
			
		||||
                         [rabbit_misc:rs(Name)]),
 | 
			
		||||
        %% shut down Ra nodes so that they are not considered for leader election
 | 
			
		||||
        {RegisteredName, _LeaderNode} = amqqueue:get_pid(Q),
 | 
			
		||||
        RaNode = {RegisteredName, node()},
 | 
			
		||||
        rabbit_log:debug("Will stop Ra server ~tp", [RaNode]),
 | 
			
		||||
        case rabbit_quorum_queue:stop_server(RaNode) of
 | 
			
		||||
            ok     ->
 | 
			
		||||
                rabbit_log:debug("Successfully stopped Ra server ~tp", [RaNode]);
 | 
			
		||||
            {error, nodedown} ->
 | 
			
		||||
                rabbit_log:error("Failed to stop Ra server ~tp: target node was reported as down")
 | 
			
		||||
        end
 | 
			
		||||
     end || Q <- Queues],
 | 
			
		||||
    rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node").
 | 
			
		||||
 | 
			
		||||
revive() ->
 | 
			
		||||
    revive_local_queue_replicas().
 | 
			
		||||
 | 
			
		||||
revive_local_queue_replicas() ->
 | 
			
		||||
    Queues = rabbit_amqqueue:list_local_followers(),
 | 
			
		||||
    %% NB: this function ignores the first argument so we can just pass the
 | 
			
		||||
    %% empty binary as the vhost name.
 | 
			
		||||
    {Recovered, Failed} = rabbit_quorum_queue:recover(<<>>, Queues),
 | 
			
		||||
    rabbit_log:debug("Successfully revived ~b quorum queue replicas",
 | 
			
		||||
                     [length(Recovered)]),
 | 
			
		||||
    case length(Failed) of
 | 
			
		||||
        0 ->
 | 
			
		||||
            ok;
 | 
			
		||||
        NumFailed ->
 | 
			
		||||
            rabbit_log:error("Failed to revive ~b quorum queue replicas",
 | 
			
		||||
                             [NumFailed])
 | 
			
		||||
    end,
 | 
			
		||||
 | 
			
		||||
    rabbit_log:info("Restart of local quorum queue replicas is complete"),
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
queue_vm_stats_sups() ->
 | 
			
		||||
    {[quorum_queue_procs,
 | 
			
		||||
      quorum_queue_dlx_procs],
 | 
			
		||||
     [[ra_server_sup_sup],
 | 
			
		||||
      [rabbit_fifo_dlx_sup]]}.
 | 
			
		||||
 | 
			
		||||
queue_vm_ets() ->
 | 
			
		||||
    {[quorum_ets],
 | 
			
		||||
     [[ra_log_ets]]}.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -59,6 +59,18 @@
 | 
			
		|||
 | 
			
		||||
-export([check_max_segment_size_bytes/1]).
 | 
			
		||||
 | 
			
		||||
-export([queue_topology/1,
 | 
			
		||||
         feature_flag_name/0,
 | 
			
		||||
         policy_apply_to_name/0,
 | 
			
		||||
         can_redeliver/0,
 | 
			
		||||
         stop/1,
 | 
			
		||||
         is_replicated/0,
 | 
			
		||||
         rebalance_module/0,
 | 
			
		||||
         drain/1,
 | 
			
		||||
         revive/0,
 | 
			
		||||
         queue_vm_stats_sups/0,
 | 
			
		||||
         queue_vm_ets/0]).
 | 
			
		||||
 | 
			
		||||
-include_lib("rabbit_common/include/rabbit.hrl").
 | 
			
		||||
-include("amqqueue.hrl").
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -103,6 +115,17 @@
 | 
			
		|||
-import(rabbit_queue_type_util, [args_policy_lookup/3]).
 | 
			
		||||
-import(rabbit_misc, [queue_resource/2]).
 | 
			
		||||
 | 
			
		||||
-rabbit_boot_step(
 | 
			
		||||
   {?MODULE,
 | 
			
		||||
    [{description, "Stream queue: queue type"},
 | 
			
		||||
     {mfa,      {rabbit_registry, register,
 | 
			
		||||
                    [queue, <<"stream">>, ?MODULE]}},
 | 
			
		||||
     %% {cleanup,  {rabbit_registry, unregister,
 | 
			
		||||
     %%             [queue, <<"stream">>]}},
 | 
			
		||||
     {requires, rabbit_registry}%%,
 | 
			
		||||
     %% {enables,     rabbit_stream_queue_type}
 | 
			
		||||
    ]}).
 | 
			
		||||
 | 
			
		||||
-type client() :: #stream_client{}.
 | 
			
		||||
 | 
			
		||||
-spec is_enabled() -> boolean().
 | 
			
		||||
| 
						 | 
				
			
			@ -832,10 +855,6 @@ status(Vhost, QueueName) ->
 | 
			
		|||
    %% Handle not found queues
 | 
			
		||||
    QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue},
 | 
			
		||||
    case rabbit_amqqueue:lookup(QName) of
 | 
			
		||||
        {ok, Q} when ?amqqueue_is_classic(Q) ->
 | 
			
		||||
            {error, classic_queue_not_supported};
 | 
			
		||||
        {ok, Q} when ?amqqueue_is_quorum(Q) ->
 | 
			
		||||
            {error, quorum_queue_not_supported};
 | 
			
		||||
        {ok, Q} when ?amqqueue_is_stream(Q) ->
 | 
			
		||||
            [begin
 | 
			
		||||
                 [get_key(role, C),
 | 
			
		||||
| 
						 | 
				
			
			@ -847,6 +866,8 @@ status(Vhost, QueueName) ->
 | 
			
		|||
                  get_key(readers, C),
 | 
			
		||||
                  get_key(segments, C)]
 | 
			
		||||
             end || C <- get_counters(Q)];
 | 
			
		||||
        {ok, _Q} ->
 | 
			
		||||
            {error, not_supported};
 | 
			
		||||
        {error, not_found} = E ->
 | 
			
		||||
            E
 | 
			
		||||
    end.
 | 
			
		||||
| 
						 | 
				
			
			@ -905,10 +926,6 @@ tracking_status(Vhost, QueueName) ->
 | 
			
		|||
    %% Handle not found queues
 | 
			
		||||
    QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue},
 | 
			
		||||
    case rabbit_amqqueue:lookup(QName) of
 | 
			
		||||
        {ok, Q} when ?amqqueue_is_classic(Q) ->
 | 
			
		||||
            {error, classic_queue_not_supported};
 | 
			
		||||
        {ok, Q} when ?amqqueue_is_quorum(Q) ->
 | 
			
		||||
            {error, quorum_queue_not_supported};
 | 
			
		||||
        {ok, Q} when ?amqqueue_is_stream(Q) ->
 | 
			
		||||
            Leader = amqqueue:get_pid(Q),
 | 
			
		||||
            Map = osiris:read_tracking(Leader),
 | 
			
		||||
| 
						 | 
				
			
			@ -921,6 +938,8 @@ tracking_status(Vhost, QueueName) ->
 | 
			
		|||
                                                  {value, TrkData}] | Acc0]
 | 
			
		||||
                                        end, [], Trackings) ++ Acc
 | 
			
		||||
                      end, [], Map);
 | 
			
		||||
        {ok, Q} ->
 | 
			
		||||
            {error, {queue_not_supported, ?amqqueue_type(Q)}};
 | 
			
		||||
        {error, not_found} = E->
 | 
			
		||||
            E
 | 
			
		||||
    end.
 | 
			
		||||
| 
						 | 
				
			
			@ -1021,10 +1040,6 @@ restart_stream(VHost, Queue, Options)
 | 
			
		|||
add_replica(VHost, Name, Node) ->
 | 
			
		||||
    QName = queue_resource(VHost, Name),
 | 
			
		||||
    case rabbit_amqqueue:lookup(QName) of
 | 
			
		||||
        {ok, Q} when ?amqqueue_is_classic(Q) ->
 | 
			
		||||
            {error, classic_queue_not_supported};
 | 
			
		||||
        {ok, Q} when ?amqqueue_is_quorum(Q) ->
 | 
			
		||||
            {error, quorum_queue_not_supported};
 | 
			
		||||
        {ok, Q} when ?amqqueue_is_stream(Q) ->
 | 
			
		||||
            case lists:member(Node, rabbit_nodes:list_running()) of
 | 
			
		||||
                false ->
 | 
			
		||||
| 
						 | 
				
			
			@ -1032,6 +1047,8 @@ add_replica(VHost, Name, Node) ->
 | 
			
		|||
                true ->
 | 
			
		||||
                    rabbit_stream_coordinator:add_replica(Q, Node)
 | 
			
		||||
            end;
 | 
			
		||||
        {ok, Q} ->
 | 
			
		||||
            {error, {queue_not_supported, ?amqqueue_type(Q)}};
 | 
			
		||||
        E ->
 | 
			
		||||
            E
 | 
			
		||||
    end.
 | 
			
		||||
| 
						 | 
				
			
			@ -1039,14 +1056,12 @@ add_replica(VHost, Name, Node) ->
 | 
			
		|||
delete_replica(VHost, Name, Node) ->
 | 
			
		||||
    QName = queue_resource(VHost, Name),
 | 
			
		||||
    case rabbit_amqqueue:lookup(QName) of
 | 
			
		||||
        {ok, Q} when ?amqqueue_is_classic(Q) ->
 | 
			
		||||
            {error, classic_queue_not_supported};
 | 
			
		||||
        {ok, Q} when ?amqqueue_is_quorum(Q) ->
 | 
			
		||||
            {error, quorum_queue_not_supported};
 | 
			
		||||
        {ok, Q} when ?amqqueue_is_stream(Q) ->
 | 
			
		||||
            #{name := StreamId} = amqqueue:get_type_state(Q),
 | 
			
		||||
            {ok, Reply, _} = rabbit_stream_coordinator:delete_replica(StreamId, Node),
 | 
			
		||||
            Reply;
 | 
			
		||||
        {ok, Q} ->
 | 
			
		||||
            {error, {queue_not_supported, ?amqqueue_type(Q)}};
 | 
			
		||||
        E ->
 | 
			
		||||
            E
 | 
			
		||||
    end.
 | 
			
		||||
| 
						 | 
				
			
			@ -1393,3 +1408,76 @@ delivery_count_add(none, _) ->
 | 
			
		|||
    none;
 | 
			
		||||
delivery_count_add(Count, N) ->
 | 
			
		||||
    serial_number:add(Count, N).
 | 
			
		||||
 | 
			
		||||
-spec queue_topology(amqqueue:amqqueue()) ->
 | 
			
		||||
    {Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
 | 
			
		||||
queue_topology(Q) ->
 | 
			
		||||
    #{name := StreamId} = amqqueue:get_type_state(Q),
 | 
			
		||||
    case rabbit_stream_coordinator:members(StreamId) of
 | 
			
		||||
        {ok, Members} ->
 | 
			
		||||
            maps:fold(fun(Node, {_Pid, writer}, {_, Replicas}) ->
 | 
			
		||||
                              {Node, [Node | Replicas]};
 | 
			
		||||
                         (Node, {_Pid, replica}, {Writer, Replicas}) ->
 | 
			
		||||
                              {Writer, [Node | Replicas]}
 | 
			
		||||
                      end, {undefined, []}, Members);
 | 
			
		||||
                {error, _} ->
 | 
			
		||||
            {undefined, undefined}
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
feature_flag_name() ->
 | 
			
		||||
    stream_queue.
 | 
			
		||||
 | 
			
		||||
policy_apply_to_name() ->
 | 
			
		||||
    <<"streams">>.
 | 
			
		||||
 | 
			
		||||
can_redeliver() ->
 | 
			
		||||
    true.
 | 
			
		||||
 | 
			
		||||
stop(_VHost) ->
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
is_replicated() ->
 | 
			
		||||
    true.
 | 
			
		||||
 | 
			
		||||
rebalance_module() ->
 | 
			
		||||
    ?MODULE.
 | 
			
		||||
 | 
			
		||||
drain(TransferCandidates) ->
 | 
			
		||||
    case whereis(rabbit_stream_coordinator) of
 | 
			
		||||
        undefined -> ok;
 | 
			
		||||
        _Pid -> transfer_leadership_of_stream_coordinator(TransferCandidates)
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
revive() ->
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
-spec transfer_leadership_of_stream_coordinator([node()]) -> ok.
 | 
			
		||||
transfer_leadership_of_stream_coordinator([]) ->
 | 
			
		||||
    rabbit_log:warning("Skipping leadership transfer of stream coordinator: no candidate "
 | 
			
		||||
                       "(online, not under maintenance) nodes to transfer to!");
 | 
			
		||||
transfer_leadership_of_stream_coordinator(TransferCandidates) ->
 | 
			
		||||
    % try to transfer to the node with the lowest uptime; the assumption is that
 | 
			
		||||
    % nodes are usually restarted in a rolling fashion, in a consistent order;
 | 
			
		||||
    % therefore, the youngest node has already been restarted  or (if we are draining the first node)
 | 
			
		||||
    % that it will be restarted last. either way, this way we limit the number of transfers
 | 
			
		||||
    Uptimes = rabbit_misc:append_rpc_all_nodes(TransferCandidates, erlang, statistics, [wall_clock]),
 | 
			
		||||
    Candidates = lists:zipwith(fun(N, {U, _}) -> {N, U}  end, TransferCandidates, Uptimes),
 | 
			
		||||
    BestCandidate = element(1, hd(lists:keysort(2, Candidates))),
 | 
			
		||||
    case rabbit_stream_coordinator:transfer_leadership([BestCandidate]) of
 | 
			
		||||
        {ok, Node} ->
 | 
			
		||||
            rabbit_log:info("Leadership transfer for stream coordinator completed. The new leader is ~p", [Node]);
 | 
			
		||||
        Error ->
 | 
			
		||||
            rabbit_log:warning("Skipping leadership transfer of stream coordinator: ~p", [Error])
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
queue_vm_stats_sups() ->
 | 
			
		||||
    {[stream_queue_procs,
 | 
			
		||||
      stream_queue_replica_reader_procs,
 | 
			
		||||
      stream_queue_coordinator_procs],
 | 
			
		||||
     [[osiris_server_sup],
 | 
			
		||||
      [osiris_replica_reader_sup],
 | 
			
		||||
      [rabbit_stream_coordinator]]}.
 | 
			
		||||
 | 
			
		||||
queue_vm_ets() ->
 | 
			
		||||
    {[],
 | 
			
		||||
     []}.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -56,9 +56,7 @@ endangered_critical_components() ->
 | 
			
		|||
do_await_safe_online_quorum(0) ->
 | 
			
		||||
    false;
 | 
			
		||||
do_await_safe_online_quorum(IterationsLeft) ->
 | 
			
		||||
    EndangeredQueues = lists:append(
 | 
			
		||||
                         rabbit_quorum_queue:list_with_minimum_quorum(),
 | 
			
		||||
                         rabbit_stream_queue:list_with_minimum_quorum()),
 | 
			
		||||
    EndangeredQueues = rabbit_queue_type:endangered_queues(),
 | 
			
		||||
    case EndangeredQueues =:= [] andalso endangered_critical_components() =:= [] of
 | 
			
		||||
        true -> true;
 | 
			
		||||
        false ->
 | 
			
		||||
| 
						 | 
				
			
			@ -83,9 +81,7 @@ do_await_safe_online_quorum(IterationsLeft) ->
 | 
			
		|||
 | 
			
		||||
-spec list_with_minimum_quorum_for_cli() -> [#{binary() => term()}].
 | 
			
		||||
list_with_minimum_quorum_for_cli() ->
 | 
			
		||||
    EndangeredQueues = lists:append(
 | 
			
		||||
                         rabbit_quorum_queue:list_with_minimum_quorum(),
 | 
			
		||||
                         rabbit_stream_queue:list_with_minimum_quorum()),
 | 
			
		||||
    EndangeredQueues = rabbit_queue_type:endangered_queues(),
 | 
			
		||||
    [amqqueue:to_printable(Q) || Q <- EndangeredQueues] ++
 | 
			
		||||
    [#{
 | 
			
		||||
           <<"readable_name">> => C,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -7,7 +7,7 @@
 | 
			
		|||
 | 
			
		||||
-module(rabbit_vm).
 | 
			
		||||
 | 
			
		||||
-export([memory/0, binary/0, ets_tables_memory/1]).
 | 
			
		||||
-export([memory/0, binary/0, ets_tables_memory/1, all_vhosts_children/1]).
 | 
			
		||||
 | 
			
		||||
-define(MAGIC_PLUGINS, ["cowboy", "ranch", "sockjs"]).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -16,19 +16,37 @@
 | 
			
		|||
-spec memory() -> rabbit_types:infos().
 | 
			
		||||
 | 
			
		||||
memory() ->
 | 
			
		||||
    All = interesting_sups(),
 | 
			
		||||
    %% this whole aggregation pipeline preserves sups order
 | 
			
		||||
    %% [{info_key, [SupName...]}...] i.e. flattened list of
 | 
			
		||||
    %% info key, sups list pairs for each queue type
 | 
			
		||||
    %% example for existing info keys:
 | 
			
		||||
    %% [{queue_procs,          queue_sups()},
 | 
			
		||||
    %%  {quorum_queue_procs,   [ra_server_sup_sup]},
 | 
			
		||||
    %%  {quorum_queue_dlx_procs, [rabbit_fifo_dlx_sup]},
 | 
			
		||||
    %%  {stream_queue_procs,   [osiris_server_sup]},
 | 
			
		||||
    %%  {stream_queue_replica_reader_procs,  [osiris_replica_reader_sup]},
 | 
			
		||||
    %%  {stream_queue_coordinator_procs, [rabbit_stream_coordinator]}]
 | 
			
		||||
    {QueueSupsStatsKeys, QueueStatsSups} = rabbit_queue_type:queue_vm_stats_sups(),
 | 
			
		||||
 | 
			
		||||
    %% we keep order and that means this variable queues part
 | 
			
		||||
    %% has to be matched somehow - | Rest is the best.
 | 
			
		||||
    All = interesting_sups() ++ QueueStatsSups,
 | 
			
		||||
    {Sums, _Other} = sum_processes(
 | 
			
		||||
                       lists:append(All), distinguishers(), [memory]),
 | 
			
		||||
 | 
			
		||||
    [Qs, Qqs, DlxWorkers, Ssqs, Srqs, SCoor, ConnsReader, ConnsWriter, ConnsChannel,
 | 
			
		||||
     ConnsOther, MsgIndexProc, MgmtDbProc, Plugins] =
 | 
			
		||||
    [ConnsReader, ConnsWriter, ConnsChannel,
 | 
			
		||||
     ConnsOther, MsgIndexProc, MgmtDbProc, Plugins | QueueSupsStats] =
 | 
			
		||||
        [aggregate(Names, Sums, memory, fun (X) -> X end)
 | 
			
		||||
         || Names <- distinguished_interesting_sups()],
 | 
			
		||||
         || Names <- distinguished_interesting_sups() ++ QueueStatsSups],
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    {QueuesEtsStatsKeys, QueueStatsEtsNames} = rabbit_queue_type:queue_vm_ets(),
 | 
			
		||||
 | 
			
		||||
    QueuesEtsStats = lists:map(fun ets_memory/1, QueueStatsEtsNames),
 | 
			
		||||
 | 
			
		||||
    MnesiaETS           = mnesia_memory(),
 | 
			
		||||
    MsgIndexETS         = ets_memory(msg_stores()),
 | 
			
		||||
    MetricsETS          = ets_memory([rabbit_metrics]),
 | 
			
		||||
    QuorumETS           = ets_memory([ra_log_ets]),
 | 
			
		||||
    MetricsProc  = try
 | 
			
		||||
                       [{_, M}] = process_info(whereis(rabbit_metrics), [memory]),
 | 
			
		||||
                       M
 | 
			
		||||
| 
						 | 
				
			
			@ -63,23 +81,20 @@ memory() ->
 | 
			
		|||
 | 
			
		||||
    OtherProc = Processes
 | 
			
		||||
        - ConnsReader - ConnsWriter - ConnsChannel - ConnsOther
 | 
			
		||||
        - Qs - Qqs - DlxWorkers - Ssqs - Srqs - SCoor - MsgIndexProc - Plugins
 | 
			
		||||
        - lists:sum(QueueSupsStats) - MsgIndexProc - Plugins
 | 
			
		||||
        - MgmtDbProc - MetricsProc - MetadataStoreProc,
 | 
			
		||||
 | 
			
		||||
    [
 | 
			
		||||
     %% Connections
 | 
			
		||||
     {connection_readers,   ConnsReader},
 | 
			
		||||
     {connection_writers,   ConnsWriter},
 | 
			
		||||
     {connection_channels,  ConnsChannel},
 | 
			
		||||
     {connection_other,     ConnsOther},
 | 
			
		||||
     {connection_other,     ConnsOther}] ++
 | 
			
		||||
 | 
			
		||||
     %% Queues
 | 
			
		||||
     {queue_procs,          Qs},
 | 
			
		||||
     {quorum_queue_procs,   Qqs},
 | 
			
		||||
     {quorum_queue_dlx_procs, DlxWorkers},
 | 
			
		||||
     {stream_queue_procs,   Ssqs},
 | 
			
		||||
     {stream_queue_replica_reader_procs,  Srqs},
 | 
			
		||||
     {stream_queue_coordinator_procs, SCoor},
 | 
			
		||||
    lists:zip(QueueSupsStatsKeys, QueueSupsStats) ++
 | 
			
		||||
 | 
			
		||||
    [
 | 
			
		||||
     %% Processes
 | 
			
		||||
     {plugins,              Plugins},
 | 
			
		||||
     {metadata_store,       MetadataStoreProc},
 | 
			
		||||
| 
						 | 
				
			
			@ -87,13 +102,16 @@ memory() ->
 | 
			
		|||
 | 
			
		||||
     %% Metrics
 | 
			
		||||
     {metrics,              MetricsETS + MetricsProc},
 | 
			
		||||
     {mgmt_db,              MgmtDbETS + MgmtDbProc},
 | 
			
		||||
     {mgmt_db,              MgmtDbETS + MgmtDbProc}] ++
 | 
			
		||||
 | 
			
		||||
     %% ETS
 | 
			
		||||
     %% queues
 | 
			
		||||
    lists:zip(QueuesEtsStatsKeys, QueuesEtsStats) ++
 | 
			
		||||
 | 
			
		||||
    [
 | 
			
		||||
     {mnesia,               MnesiaETS},
 | 
			
		||||
     {quorum_ets,           QuorumETS},
 | 
			
		||||
     {metadata_store_ets,   MetadataStoreETS},
 | 
			
		||||
     {other_ets,            ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS - QuorumETS - MetadataStoreETS},
 | 
			
		||||
     {other_ets,            ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS  - MetadataStoreETS - lists:sum(QueuesEtsStats)},
 | 
			
		||||
 | 
			
		||||
     %% Messages (mostly, some binaries are not messages)
 | 
			
		||||
     {binary,               Bin},
 | 
			
		||||
| 
						 | 
				
			
			@ -110,6 +128,7 @@ memory() ->
 | 
			
		|||
                             {rss, Rss},
 | 
			
		||||
                             {allocated, Allocated}]}
 | 
			
		||||
    ].
 | 
			
		||||
 | 
			
		||||
%% [1] - erlang:memory(processes) can be less than the sum of its
 | 
			
		||||
%% parts. Rather than display something nonsensical, just silence any
 | 
			
		||||
%% claims about negative memory. See
 | 
			
		||||
| 
						 | 
				
			
			@ -118,7 +137,9 @@ memory() ->
 | 
			
		|||
-spec binary() -> rabbit_types:infos().
 | 
			
		||||
 | 
			
		||||
binary() ->
 | 
			
		||||
    All = interesting_sups(),
 | 
			
		||||
    {QueueSupsStatsKeys, QueueStatsSups} = rabbit_queue_type:queue_vm_stats_sups(),
 | 
			
		||||
 | 
			
		||||
    All = interesting_sups() ++ QueueStatsSups,
 | 
			
		||||
    {Sums, Rest} =
 | 
			
		||||
        sum_processes(
 | 
			
		||||
          lists:append(All),
 | 
			
		||||
| 
						 | 
				
			
			@ -127,10 +148,10 @@ binary() ->
 | 
			
		|||
                                      sets:add_element({Ptr, Sz}, Acc0)
 | 
			
		||||
                              end, Acc, Info)
 | 
			
		||||
          end, distinguishers(), [{binary, sets:new()}]),
 | 
			
		||||
    [Other, Qs, Qqs, DlxWorkers, Ssqs, Srqs, Scoor, ConnsReader, ConnsWriter,
 | 
			
		||||
     ConnsChannel, ConnsOther, MsgIndexProc, MgmtDbProc, Plugins] =
 | 
			
		||||
    [Other, ConnsReader, ConnsWriter,
 | 
			
		||||
     ConnsChannel, ConnsOther, MsgIndexProc, MgmtDbProc, Plugins | QueueSupsStats] =
 | 
			
		||||
        [aggregate(Names, [{other, Rest} | Sums], binary, fun sum_binary/1)
 | 
			
		||||
         || Names <- [[other] | distinguished_interesting_sups()]],
 | 
			
		||||
         || Names <- [[other] | distinguished_interesting_sups()] ++ QueueStatsSups],
 | 
			
		||||
    MetadataStoreProc = try
 | 
			
		||||
                            [{_, B}] = process_info(whereis(rabbit_khepri:get_ra_cluster_name()), [binary]),
 | 
			
		||||
                            lists:foldl(fun({_, Sz, _}, Acc) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -143,13 +164,10 @@ binary() ->
 | 
			
		|||
    [{connection_readers,  ConnsReader},
 | 
			
		||||
     {connection_writers,  ConnsWriter},
 | 
			
		||||
     {connection_channels, ConnsChannel},
 | 
			
		||||
     {connection_other,    ConnsOther},
 | 
			
		||||
     {queue_procs,         Qs},
 | 
			
		||||
     {quorum_queue_procs,  Qqs},
 | 
			
		||||
     {quorum_queue_dlx_procs, DlxWorkers},
 | 
			
		||||
     {stream_queue_procs,  Ssqs},
 | 
			
		||||
     {stream_queue_replica_reader_procs, Srqs},
 | 
			
		||||
     {stream_queue_coordinator_procs, Scoor},
 | 
			
		||||
     {connection_other,    ConnsOther}] ++
 | 
			
		||||
     %% Queues
 | 
			
		||||
    lists:zip(QueueSupsStatsKeys, QueueSupsStats) ++
 | 
			
		||||
    [
 | 
			
		||||
     {metadata_store,      MetadataStoreProc},
 | 
			
		||||
     {plugins,             Plugins},
 | 
			
		||||
     {mgmt_db,             MgmtDbProc},
 | 
			
		||||
| 
						 | 
				
			
			@ -194,19 +212,7 @@ bytes(Words) ->  try
 | 
			
		|||
                 end.
 | 
			
		||||
 | 
			
		||||
interesting_sups() ->
 | 
			
		||||
    [queue_sups(), quorum_sups(), dlx_sups(),
 | 
			
		||||
     stream_server_sups(), stream_reader_sups(), stream_coordinator(),
 | 
			
		||||
     conn_sups() | interesting_sups0()].
 | 
			
		||||
 | 
			
		||||
queue_sups() ->
 | 
			
		||||
    all_vhosts_children(rabbit_amqqueue_sup_sup).
 | 
			
		||||
 | 
			
		||||
quorum_sups() -> [ra_server_sup_sup].
 | 
			
		||||
 | 
			
		||||
dlx_sups() -> [rabbit_fifo_dlx_sup].
 | 
			
		||||
stream_server_sups() -> [osiris_server_sup].
 | 
			
		||||
stream_reader_sups() -> [osiris_replica_reader_sup].
 | 
			
		||||
stream_coordinator() -> [rabbit_stream_coordinator].
 | 
			
		||||
    [conn_sups() | interesting_sups0()].
 | 
			
		||||
 | 
			
		||||
msg_stores() ->
 | 
			
		||||
    all_vhosts_children(msg_store_transient)
 | 
			
		||||
| 
						 | 
				
			
			@ -256,12 +262,6 @@ distinguishers() -> with(conn_sups(), fun conn_type/1).
 | 
			
		|||
 | 
			
		||||
distinguished_interesting_sups() ->
 | 
			
		||||
    [
 | 
			
		||||
     queue_sups(),
 | 
			
		||||
     quorum_sups(),
 | 
			
		||||
     dlx_sups(),
 | 
			
		||||
     stream_server_sups(),
 | 
			
		||||
     stream_reader_sups(),
 | 
			
		||||
     stream_coordinator(),
 | 
			
		||||
     with(conn_sups(), reader),
 | 
			
		||||
     with(conn_sups(), writer),
 | 
			
		||||
     with(conn_sups(), channel),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,6 +2,8 @@
 | 
			
		|||
 | 
			
		||||
-include_lib("eunit/include/eunit.hrl").
 | 
			
		||||
 | 
			
		||||
-include_lib("rabbit_common/include/rabbit.hrl").
 | 
			
		||||
 | 
			
		||||
-export([
 | 
			
		||||
         wait_for_messages_ready/3,
 | 
			
		||||
         wait_for_messages_pending_ack/3,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -550,10 +550,10 @@ add_replica(Config) ->
 | 
			
		|||
    ?assertEqual({error, node_not_running},
 | 
			
		||||
                 rpc:call(Server0, rabbit_stream_queue, add_replica,
 | 
			
		||||
                          [<<"/">>, Q, Server1])),
 | 
			
		||||
    ?assertEqual({error, classic_queue_not_supported},
 | 
			
		||||
    ?assertEqual({error, {queue_not_supported, rabbit_classic_queue}},
 | 
			
		||||
                 rpc:call(Server0, rabbit_stream_queue, add_replica,
 | 
			
		||||
                          [<<"/">>, QClassic, Server1])),
 | 
			
		||||
    ?assertEqual({error, quorum_queue_not_supported},
 | 
			
		||||
    ?assertEqual({error, {queue_not_supported, rabbit_quorum_queue}},
 | 
			
		||||
                 rpc:call(Server0, rabbit_stream_queue, add_replica,
 | 
			
		||||
                          [<<"/">>, QQuorum, Server1])),
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -561,10 +561,10 @@ add_replica(Config) ->
 | 
			
		|||
    ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
 | 
			
		||||
    rabbit_control_helper:command(start_app, Server1),
 | 
			
		||||
    timer:sleep(1000),
 | 
			
		||||
    ?assertEqual({error, classic_queue_not_supported},
 | 
			
		||||
    ?assertEqual({error, {queue_not_supported, rabbit_classic_queue}},
 | 
			
		||||
                 rpc:call(Server0, rabbit_stream_queue, add_replica,
 | 
			
		||||
                          [<<"/">>, QClassic, Server1])),
 | 
			
		||||
    ?assertEqual({error, quorum_queue_not_supported},
 | 
			
		||||
    ?assertEqual({error, {queue_not_supported, rabbit_quorum_queue}},
 | 
			
		||||
                 rpc:call(Server0, rabbit_stream_queue, add_replica,
 | 
			
		||||
                          [<<"/">>, QQuorum, Server1])),
 | 
			
		||||
    ?assertEqual(ok,
 | 
			
		||||
| 
						 | 
				
			
			@ -748,10 +748,10 @@ delete_classic_replica(Config) ->
 | 
			
		|||
    ?assertEqual({'queue.declare_ok', Q, 0, 0},
 | 
			
		||||
                 declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"classic">>}])),
 | 
			
		||||
    %% Not a member of the cluster, what would happen?
 | 
			
		||||
    ?assertEqual({error, classic_queue_not_supported},
 | 
			
		||||
    ?assertEqual({error, {queue_not_supported, rabbit_classic_queue}},
 | 
			
		||||
                 rpc:call(Server0, rabbit_stream_queue, delete_replica,
 | 
			
		||||
                          [<<"/">>, Q, 'zen@rabbit'])),
 | 
			
		||||
    ?assertEqual({error, classic_queue_not_supported},
 | 
			
		||||
    ?assertEqual({error, {queue_not_supported, rabbit_classic_queue}},
 | 
			
		||||
                 rpc:call(Server0, rabbit_stream_queue, delete_replica,
 | 
			
		||||
                          [<<"/">>, Q, Server1])),
 | 
			
		||||
    rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
 | 
			
		||||
| 
						 | 
				
			
			@ -763,10 +763,10 @@ delete_quorum_replica(Config) ->
 | 
			
		|||
    ?assertEqual({'queue.declare_ok', Q, 0, 0},
 | 
			
		||||
                 declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
 | 
			
		||||
    %% Not a member of the cluster, what would happen?
 | 
			
		||||
    ?assertEqual({error, quorum_queue_not_supported},
 | 
			
		||||
    ?assertEqual({error, {queue_not_supported, rabbit_quorum_queue}},
 | 
			
		||||
                 rpc:call(Server0, rabbit_stream_queue, delete_replica,
 | 
			
		||||
                          [<<"/">>, Q, 'zen@rabbit'])),
 | 
			
		||||
    ?assertEqual({error, quorum_queue_not_supported},
 | 
			
		||||
    ?assertEqual({error, {queue_not_supported, rabbit_quorum_queue}},
 | 
			
		||||
                 rpc:call(Server0, rabbit_stream_queue, delete_replica,
 | 
			
		||||
                          [<<"/">>, Q, Server1])),
 | 
			
		||||
    rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -36,7 +36,10 @@
 | 
			
		|||
         queue_declared/1,
 | 
			
		||||
         queue_created/1,
 | 
			
		||||
         queue_deleted/1,
 | 
			
		||||
         queues_deleted/1]).
 | 
			
		||||
         queues_deleted/1,
 | 
			
		||||
         %% used by ra-based queues to cleanup follower metrics,
 | 
			
		||||
         %% see rabbit_core_metrics_gc for an example
 | 
			
		||||
         delete_queue_coarse_metrics/1]).
 | 
			
		||||
 | 
			
		||||
-export([node_stats/2]).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -321,10 +324,14 @@ partition_queues(Queues) ->
 | 
			
		|||
    [Queues].
 | 
			
		||||
 | 
			
		||||
delete_queue_metrics(Queue) ->
 | 
			
		||||
    ets:delete(queue_coarse_metrics, Queue),
 | 
			
		||||
    delete_queue_coarse_metrics(Queue),
 | 
			
		||||
    ets:update_element(queue_metrics, Queue, {3, 1}),
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
delete_queue_coarse_metrics(Queue) ->
 | 
			
		||||
    ets:delete(queue_coarse_metrics, Queue),
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
delete_channel_queue_exchange_metrics(MatchSpecCondition) ->
 | 
			
		||||
    ChannelQueueExchangeMetricsToUpdate = ets:select(
 | 
			
		||||
        channel_queue_exchange_metrics,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -15,7 +15,7 @@
 | 
			
		|||
         code_change/3]).
 | 
			
		||||
 | 
			
		||||
-export([register/3, unregister/2,
 | 
			
		||||
         binary_to_type/1, lookup_module/2, lookup_all/1]).
 | 
			
		||||
         binary_to_type/1, lookup_module/2, lookup_type_module/2, lookup_type_name/2, lookup_all/1]).
 | 
			
		||||
 | 
			
		||||
-define(SERVER, ?MODULE).
 | 
			
		||||
-define(ETS_NAME, ?MODULE).
 | 
			
		||||
| 
						 | 
				
			
			@ -61,6 +61,61 @@ lookup_module(Class, T) when is_atom(T) ->
 | 
			
		|||
            {error, not_found}
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
-spec lookup_type_module(Class, TypeDescriptor) ->
 | 
			
		||||
          Ret when
 | 
			
		||||
      Class :: atom(),
 | 
			
		||||
      TypeDescriptor :: atom() | %% can be TypeModule or Type
 | 
			
		||||
                        binary(), %% or whati currently called "alias" - a TypeName
 | 
			
		||||
      Ret :: {ok, TypeModule} | {error, not_found},
 | 
			
		||||
      TypeModule :: atom().
 | 
			
		||||
lookup_type_module(Class, TypeDescriptor) ->
 | 
			
		||||
    case lookup_type(Class, TypeDescriptor) of
 | 
			
		||||
            {error, _} = Error ->
 | 
			
		||||
            Error;
 | 
			
		||||
        {ok, {_TypeName, TypeModule}} ->
 | 
			
		||||
            {ok, TypeModule}
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
-spec lookup_type_name(Class, TypeDescriptor) ->
 | 
			
		||||
          Ret when
 | 
			
		||||
      Class :: atom(),
 | 
			
		||||
      TypeDescriptor :: atom() | %% either full typemodule or atomized typename
 | 
			
		||||
                        binary(), %% typename pr typemodule in binary
 | 
			
		||||
      Ret :: {ok, binary()} | {error, not_found}.
 | 
			
		||||
lookup_type_name(Class, TypeDescriptor) ->
 | 
			
		||||
    case lookup_type(Class, TypeDescriptor) of
 | 
			
		||||
            {error, _} = Error ->
 | 
			
		||||
            Error;
 | 
			
		||||
        {ok, {TypeName, _TypeModule}} ->
 | 
			
		||||
            {ok, atom_to_binary(TypeName)}
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
lookup_type(Class, TypeDescriptor)
 | 
			
		||||
    when is_atom(TypeDescriptor) ->
 | 
			
		||||
    case ets:lookup(?ETS_NAME, {Class, TypeDescriptor}) of
 | 
			
		||||
        [{_, Module}] ->
 | 
			
		||||
            {ok, {TypeDescriptor, Module}};
 | 
			
		||||
        [] ->
 | 
			
		||||
             %% In principle it is enough to do the same sanity check
 | 
			
		||||
             %% we do when registring a type.
 | 
			
		||||
             %% This however will return false positives for loaded
 | 
			
		||||
             %% but unregistered modules.
 | 
			
		||||
             TMMatch = ets:match(?ETS_NAME, {{Class, '$1'}, TypeDescriptor}),
 | 
			
		||||
             case TMMatch of
 | 
			
		||||
                 [[TypeName]] -> {ok, {TypeName, TypeDescriptor}};
 | 
			
		||||
                 [] ->
 | 
			
		||||
                     {error, not_found}
 | 
			
		||||
             end
 | 
			
		||||
    end;
 | 
			
		||||
lookup_type(Class, TypeDescriptor)
 | 
			
		||||
    when is_binary(TypeDescriptor) ->
 | 
			
		||||
    %% when we register a type we convert
 | 
			
		||||
    %% typename to atom so we can lookup
 | 
			
		||||
    %% only existing atoms.
 | 
			
		||||
    lookup_type(Class, binary_to_existing_atom(TypeDescriptor)).
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
lookup_all(Class) ->
 | 
			
		||||
    [{K, V} || [K, V] <- ets:match(?ETS_NAME, {{Class, '$1'}, '$2'})].
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -41,6 +41,19 @@
 | 
			
		|||
         notify_decorators/1
 | 
			
		||||
        ]).
 | 
			
		||||
 | 
			
		||||
-export([queue_topology/1,
 | 
			
		||||
         feature_flag_name/0,
 | 
			
		||||
         policy_apply_to_name/0,
 | 
			
		||||
         can_redeliver/0,
 | 
			
		||||
         stop/1,
 | 
			
		||||
         is_replicated/0,
 | 
			
		||||
         rebalance_module/0,
 | 
			
		||||
         list_with_minimum_quorum/0,
 | 
			
		||||
         drain/1,
 | 
			
		||||
         revive/0,
 | 
			
		||||
         queue_vm_stats_sups/0,
 | 
			
		||||
         dir_base/0]).
 | 
			
		||||
 | 
			
		||||
%% Stateful rabbit_queue_type callbacks are unsupported by this queue type.
 | 
			
		||||
-define(STATEFUL_CALLBACKS,
 | 
			
		||||
        [
 | 
			
		||||
| 
						 | 
				
			
			@ -301,3 +314,43 @@ dequeue(A1,A2,A3,A4,A5) ->
 | 
			
		|||
 | 
			
		||||
state_info(A1) ->
 | 
			
		||||
    ?UNSUPPORTED([A1]).
 | 
			
		||||
 | 
			
		||||
-spec queue_topology(amqqueue:amqqueue()) ->
 | 
			
		||||
    {Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
 | 
			
		||||
queue_topology(Q) ->
 | 
			
		||||
    Pid = amqqueue:get_pid(Q),
 | 
			
		||||
    Node = node(Pid),
 | 
			
		||||
    {Node, [Node]}.
 | 
			
		||||
 | 
			
		||||
feature_flag_name() ->
 | 
			
		||||
    undefined.
 | 
			
		||||
 | 
			
		||||
policy_apply_to_name() ->
 | 
			
		||||
    <<"qos0_queues">>.
 | 
			
		||||
 | 
			
		||||
can_redeliver() ->
 | 
			
		||||
    true.
 | 
			
		||||
 | 
			
		||||
stop(_VHost) ->
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
is_replicated() ->
 | 
			
		||||
    false.
 | 
			
		||||
 | 
			
		||||
rebalance_module() ->
 | 
			
		||||
    {error, not_supported}.
 | 
			
		||||
 | 
			
		||||
list_with_minimum_quorum() ->
 | 
			
		||||
    [].
 | 
			
		||||
 | 
			
		||||
drain(_TransferCandidates) ->
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
revive() ->
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
queue_vm_stats_sups() ->
 | 
			
		||||
    {[], []}.
 | 
			
		||||
 | 
			
		||||
dir_base() ->
 | 
			
		||||
    [].
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue