Queues with plugins - address Karl's comments

revive_local_queue_replicas -> revive_local_queue_members
can_redeliver converted from callback to capabilities key
rebalance_moduled converted from callback to capabilities key
This commit is contained in:
Iliia Khaprov 2025-04-07 21:05:53 +02:00
parent cfd51bac6c
commit 34f0d12dab
No known key found for this signature in database
GPG Key ID: 4DCFF8F358E49AED
5 changed files with 28 additions and 36 deletions

View File

@ -464,8 +464,13 @@ filter_per_type(classic, Q) ->
%% The assumption is all non-replicated queues
%% are filtered before calling this with is_replicated/0
rebalance_module(Q) ->
TypeModule = amqqueue:get_type(Q),
TypeModule:rebalance_module().
case rabbit_queue_type:rebalance_module(Q) of
undefined ->
rabbit_log:error("Undefined rebalance module for queue type: ~s", [amqqueue:get_type(Q)]),
{error, not_supported};
RBModule ->
RBModule
end.
get_resource_name(#resource{name = Name}) ->
Name.

View File

@ -66,10 +66,8 @@
-export([queue_topology/1,
policy_apply_to_name/0,
can_redeliver/0,
stop/1,
is_replicated/0,
rebalance_module/0,
list_with_minimum_quorum/0,
drain/1,
revive/0,
@ -612,7 +610,9 @@ capabilities() ->
false -> []
end,
consumer_arguments => [<<"x-priority">>],
server_named => true}.
server_named => true,
rebalance_module => undefined,
can_redeliver => false}.
notify_decorators(Q) when ?is_amqqueue(Q) ->
QPid = amqqueue:get_pid(Q),
@ -711,9 +711,6 @@ queue_topology(Q) ->
policy_apply_to_name() ->
<<"classic_queues">>.
can_redeliver() ->
false.
stop(VHost) ->
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
@ -722,9 +719,6 @@ stop(VHost) ->
is_replicated() ->
false.
rebalance_module() ->
{error, not_supported}.
list_with_minimum_quorum() ->
[].

View File

@ -62,6 +62,7 @@
notify_decorators/1,
publish_at_most_once/2,
can_redeliver/2,
rebalance_module/1,
stop/1,
list_with_minimum_quorum/0,
drain/1,
@ -268,6 +269,7 @@
-callback format(amqqueue:amqqueue(), Context :: map()) ->
[{atom(), term()}].
%% TODO: mandate keys
-callback capabilities() ->
#{atom() := term()}.
@ -283,14 +285,10 @@
%% -callback on_node_down(node()) -> ok.
-callback can_redeliver() -> boolean().
-callback stop(rabbit_types:vhost()) -> ok.
-callback is_replicated() -> boolean().
-callback rebalance_module() -> module() | {error, not_supported}.
-callback list_with_minimum_quorum() -> [amqqueue:amqqueue()].
-callback drain([node()]) -> ok.
@ -894,10 +892,17 @@ queue_limit_error(Reason, ReasonArgs) ->
can_redeliver(Q, State) ->
case module(Q, State) of
{ok, TypeModule} ->
TypeModule:can_redeliver();
Capabilities = TypeModule:capabilities(),
maps:get(can_redeliver, Capabilities, false);
_ -> false
end.
-spec rebalance_module( amqqueue:amqqueue()) -> undefine | module().
rebalance_module(Q) ->
TypeModule = amqqueue:get_type(Q),
Capabilities = TypeModule:capabilities(),
maps:get(rebalance_module, Capabilities, undefined).
-spec stop(rabbit_types:vhost()) -> ok.
stop(VHost) ->
%% original rabbit_amqqueue:stop doesn't do any catches or try after

View File

@ -79,9 +79,7 @@
-export([queue_topology/1,
policy_apply_to_name/0,
can_redeliver/0,
is_replicated/0,
rebalance_module/0,
drain/1,
revive/0,
queue_vm_stats_sups/0,
@ -561,7 +559,9 @@ capabilities() ->
<<"x-quorum-initial-group-size">>, <<"x-delivery-limit">>,
<<"x-message-ttl">>, <<"x-queue-leader-locator">>],
consumer_arguments => [<<"x-priority">>],
server_named => false}.
server_named => false,
rebalance_module => ?MODULE,
can_redeliver => true}.
rpc_delete_metrics(QName) ->
ets:delete(queue_coarse_metrics, QName),
@ -2274,15 +2274,9 @@ queue_topology(Q) ->
policy_apply_to_name() ->
<<"quorum_queues">>.
can_redeliver() ->
true.
is_replicated() ->
true.
rebalance_module() ->
?MODULE.
-spec drain([node()]) -> ok.
drain(TransferCandidates) ->
_ = transfer_leadership(TransferCandidates),
@ -2338,9 +2332,9 @@ stop_local_quorum_queue_followers() ->
rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node").
revive() ->
revive_local_queue_replicas().
revive_local_queue_members().
revive_local_queue_replicas() ->
revive_local_queue_members() ->
Queues = rabbit_amqqueue:list_local_followers(),
%% NB: this function ignores the first argument so we can just pass the
%% empty binary as the vhost name.

View File

@ -61,10 +61,8 @@
-export([queue_topology/1,
policy_apply_to_name/0,
can_redeliver/0,
stop/1,
is_replicated/0,
rebalance_module/0,
drain/1,
revive/0,
queue_vm_stats_sups/0,
@ -1354,7 +1352,9 @@ capabilities() ->
%% AMQP property filter expressions
%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227
amqp_capabilities => [<<"AMQP_FILTEX_PROP_V1_0">>],
server_named => false}.
server_named => false,
rebalance_module => ?MODULE,
can_redeliver => true}.
notify_decorators(Q) when ?is_amqqueue(Q) ->
%% Not supported
@ -1447,18 +1447,12 @@ queue_topology(Q) ->
policy_apply_to_name() ->
<<"streams">>.
can_redeliver() ->
true.
stop(_VHost) ->
ok.
is_replicated() ->
true.
rebalance_module() ->
?MODULE.
drain(TransferCandidates) ->
case whereis(rabbit_stream_coordinator) of
undefined -> ok;