Queues with plugins - move is_(replicated->replicable) to capabilities

This commit is contained in:
Iliia Khaprov 2025-05-08 15:36:27 +02:00
parent 9ef170f4e7
commit 1eeaef4874
No known key found for this signature in database
GPG Key ID: 4DCFF8F358E49AED
5 changed files with 34 additions and 29 deletions

View File

@ -37,7 +37,7 @@
-export([update/2, store_queue/1, update_decorators/2, policy_changed/2]).
-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
-export([is_match/2, is_in_virtual_host/2]).
-export([is_replicated/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]).
-export([is_replicable/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]).
-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0,
list_local_stream_queues/0, list_stream_queues_on/1,
list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1,
@ -421,7 +421,7 @@ rebalance(Type, VhostSpec, QueueSpec) ->
maybe_rebalance(get_rebalance_lock(self()), Type, VhostSpec, QueueSpec).
%% TODO: classic queues do not support rebalancing, it looks like they are simply
%% filtered out with is_replicated(Q). Maybe error instead?
%% filtered out with is_replicable(Q). Maybe error instead?
maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
rabbit_log:info("Starting queue rebalance operation: '~ts' for vhosts matching '~ts' and queues matching '~ts'",
[Type, VhostSpec, QueueSpec]),
@ -429,7 +429,7 @@ maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
NumRunning = length(Running),
ToRebalance = [Q || Q <- list(),
filter_per_type(Type, Q),
is_replicated(Q),
is_replicable(Q),
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)],
NumToRebalance = length(ToRebalance),
@ -462,7 +462,7 @@ filter_per_type(classic, Q) ->
%% for now because the original function will fail with
%% bad clause if called with classical queue.
%% The assumption is all non-replicated queues
%% are filtered before calling this with is_replicated/0
%% are filtered before calling this with is_replicable/0
rebalance_module(Q) ->
case rabbit_queue_type:rebalance_module(Q) of
undefined ->
@ -1922,11 +1922,10 @@ forget_node_for_queue(Q) ->
run_backing_queue(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
-spec is_replicated(amqqueue:amqqueue()) -> boolean().
-spec is_replicable(amqqueue:amqqueue()) -> boolean().
is_replicated(Q) ->
TypeModule = amqqueue:get_type(Q),
TypeModule:is_replicated().
is_replicable(Q) ->
rabbit_queue_type:is_replicable(Q).
is_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) ->
false;
@ -1996,7 +1995,7 @@ filter_transient_queues_to_delete(Node) ->
amqqueue:qnode(Q) == Node andalso
not rabbit_process:is_process_alive(amqqueue:get_pid(Q))
andalso (not amqqueue:is_classic(Q) orelse not amqqueue:is_durable(Q))
andalso (not is_replicated(Q)
andalso (not is_replicable(Q)
orelse is_dead_exclusive(Q))
andalso amqqueue:get_type(Q) =/= rabbit_mqtt_qos0_queue
end.

View File

@ -67,7 +67,6 @@
-export([queue_topology/1,
policy_apply_to_name/0,
stop/1,
is_replicated/0,
list_with_minimum_quorum/0,
drain/1,
revive/0,
@ -612,7 +611,9 @@ capabilities() ->
consumer_arguments => [<<"x-priority">>],
server_named => true,
rebalance_module => undefined,
can_redeliver => false}.
can_redeliver => false,
is_replicable => false
}.
notify_decorators(Q) when ?is_amqqueue(Q) ->
QPid = amqqueue:get_pid(Q),
@ -716,9 +717,6 @@ stop(VHost) ->
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
ok = BQ:stop(VHost).
is_replicated() ->
false.
list_with_minimum_quorum() ->
[].

View File

@ -63,6 +63,7 @@
publish_at_most_once/2,
can_redeliver/2,
rebalance_module/1,
is_replicable/1,
stop/1,
list_with_minimum_quorum/0,
drain/1,
@ -269,9 +270,16 @@
-callback format(amqqueue:amqqueue(), Context :: map()) ->
[{atom(), term()}].
%% TODO: mandate keys
%% TODO: replace binary() with real types?
-callback capabilities() ->
#{atom() := term()}.
#{unsupported_policies := [binary()],
queue_arguments := [binary()],
consumer_arguments := [binary()],
amqp_capabilities => [binary()],
server_named := boolean(),
rebalance_module := module(),
can_redeliver := boolean(),
is_replicable := boolean()}.
-callback notify_decorators(amqqueue:amqqueue()) ->
ok.
@ -287,8 +295,6 @@
-callback stop(rabbit_types:vhost()) -> ok.
-callback is_replicated() -> boolean().
-callback list_with_minimum_quorum() -> [amqqueue:amqqueue()].
-callback drain([node()]) -> ok.
@ -897,12 +903,18 @@ can_redeliver(Q, State) ->
_ -> false
end.
-spec rebalance_module( amqqueue:amqqueue()) -> undefine | module().
-spec rebalance_module(amqqueue:amqqueue()) -> undefine | module().
rebalance_module(Q) ->
TypeModule = amqqueue:get_type(Q),
Capabilities = TypeModule:capabilities(),
maps:get(rebalance_module, Capabilities, undefined).
-spec is_replicable(amqqueue:amqqueue()) -> undefine | module().
is_replicable(Q) ->
TypeModule = amqqueue:get_type(Q),
Capabilities = TypeModule:capabilities(),
maps:get(is_replicable, Capabilities, false).
-spec stop(rabbit_types:vhost()) -> ok.
stop(VHost) ->
%% original rabbit_amqqueue:stop doesn't do any catches or try after

View File

@ -79,7 +79,6 @@
-export([queue_topology/1,
policy_apply_to_name/0,
is_replicated/0,
drain/1,
revive/0,
queue_vm_stats_sups/0,
@ -561,7 +560,9 @@ capabilities() ->
consumer_arguments => [<<"x-priority">>],
server_named => false,
rebalance_module => ?MODULE,
can_redeliver => true}.
can_redeliver => true,
is_replicable => true
}.
rpc_delete_metrics(QName) ->
ets:delete(queue_coarse_metrics, QName),
@ -2274,9 +2275,6 @@ queue_topology(Q) ->
policy_apply_to_name() ->
<<"quorum_queues">>.
is_replicated() ->
true.
-spec drain([node()]) -> ok.
drain(TransferCandidates) ->
_ = transfer_leadership(TransferCandidates),

View File

@ -62,7 +62,6 @@
-export([queue_topology/1,
policy_apply_to_name/0,
stop/1,
is_replicated/0,
drain/1,
revive/0,
queue_vm_stats_sups/0,
@ -1354,7 +1353,9 @@ capabilities() ->
amqp_capabilities => [<<"AMQP_FILTEX_PROP_V1_0">>],
server_named => false,
rebalance_module => ?MODULE,
can_redeliver => true}.
can_redeliver => true,
is_replicable => true
}.
notify_decorators(Q) when ?is_amqqueue(Q) ->
%% Not supported
@ -1454,9 +1455,6 @@ policy_apply_to_name() ->
stop(_VHost) ->
ok.
is_replicated() ->
true.
drain(TransferCandidates) ->
case whereis(rabbit_stream_coordinator) of
undefined -> ok;