diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 304a929e55..4d58f669f3 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -464,8 +464,13 @@ filter_per_type(classic, Q) -> %% The assumption is all non-replicated queues %% are filtered before calling this with is_replicated/0 rebalance_module(Q) -> - TypeModule = amqqueue:get_type(Q), - TypeModule:rebalance_module(). + case rabbit_queue_type:rebalance_module(Q) of + undefined -> + rabbit_log:error("Undefined rebalance module for queue type: ~s", [amqqueue:get_type(Q)]), + {error, not_supported}; + RBModule -> + RBModule + end. get_resource_name(#resource{name = Name}) -> Name. diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index 17efe78f8d..503b51362e 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -66,10 +66,8 @@ -export([queue_topology/1, policy_apply_to_name/0, - can_redeliver/0, stop/1, is_replicated/0, - rebalance_module/0, list_with_minimum_quorum/0, drain/1, revive/0, @@ -612,7 +610,9 @@ capabilities() -> false -> [] end, consumer_arguments => [<<"x-priority">>], - server_named => true}. + server_named => true, + rebalance_module => undefined, + can_redeliver => false}. notify_decorators(Q) when ?is_amqqueue(Q) -> QPid = amqqueue:get_pid(Q), @@ -711,9 +711,6 @@ queue_topology(Q) -> policy_apply_to_name() -> <<"classic_queues">>. -can_redeliver() -> - false. - stop(VHost) -> ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost), {ok, BQ} = application:get_env(rabbit, backing_queue_module), @@ -722,9 +719,6 @@ stop(VHost) -> is_replicated() -> false. -rebalance_module() -> - {error, not_supported}. - list_with_minimum_quorum() -> []. diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index b2be619c2f..097ceac6ac 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -62,6 +62,7 @@ notify_decorators/1, publish_at_most_once/2, can_redeliver/2, + rebalance_module/1, stop/1, list_with_minimum_quorum/0, drain/1, @@ -268,6 +269,7 @@ -callback format(amqqueue:amqqueue(), Context :: map()) -> [{atom(), term()}]. +%% TODO: mandate keys -callback capabilities() -> #{atom() := term()}. @@ -283,14 +285,10 @@ %% -callback on_node_down(node()) -> ok. --callback can_redeliver() -> boolean(). - -callback stop(rabbit_types:vhost()) -> ok. -callback is_replicated() -> boolean(). --callback rebalance_module() -> module() | {error, not_supported}. - -callback list_with_minimum_quorum() -> [amqqueue:amqqueue()]. -callback drain([node()]) -> ok. @@ -894,10 +892,17 @@ queue_limit_error(Reason, ReasonArgs) -> can_redeliver(Q, State) -> case module(Q, State) of {ok, TypeModule} -> - TypeModule:can_redeliver(); + Capabilities = TypeModule:capabilities(), + maps:get(can_redeliver, Capabilities, false); _ -> false end. +-spec rebalance_module( amqqueue:amqqueue()) -> undefine | module(). +rebalance_module(Q) -> + TypeModule = amqqueue:get_type(Q), + Capabilities = TypeModule:capabilities(), + maps:get(rebalance_module, Capabilities, undefined). + -spec stop(rabbit_types:vhost()) -> ok. stop(VHost) -> %% original rabbit_amqqueue:stop doesn't do any catches or try after diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index e5310ea371..71d15ce633 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -79,9 +79,7 @@ -export([queue_topology/1, policy_apply_to_name/0, - can_redeliver/0, is_replicated/0, - rebalance_module/0, drain/1, revive/0, queue_vm_stats_sups/0, @@ -561,7 +559,9 @@ capabilities() -> <<"x-quorum-initial-group-size">>, <<"x-delivery-limit">>, <<"x-message-ttl">>, <<"x-queue-leader-locator">>], consumer_arguments => [<<"x-priority">>], - server_named => false}. + server_named => false, + rebalance_module => ?MODULE, + can_redeliver => true}. rpc_delete_metrics(QName) -> ets:delete(queue_coarse_metrics, QName), @@ -2274,15 +2274,9 @@ queue_topology(Q) -> policy_apply_to_name() -> <<"quorum_queues">>. -can_redeliver() -> - true. - is_replicated() -> true. -rebalance_module() -> - ?MODULE. - -spec drain([node()]) -> ok. drain(TransferCandidates) -> _ = transfer_leadership(TransferCandidates), @@ -2338,9 +2332,9 @@ stop_local_quorum_queue_followers() -> rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node"). revive() -> - revive_local_queue_replicas(). + revive_local_queue_members(). -revive_local_queue_replicas() -> +revive_local_queue_members() -> Queues = rabbit_amqqueue:list_local_followers(), %% NB: this function ignores the first argument so we can just pass the %% empty binary as the vhost name. diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 9c63db1478..d4519b50f6 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -61,10 +61,8 @@ -export([queue_topology/1, policy_apply_to_name/0, - can_redeliver/0, stop/1, is_replicated/0, - rebalance_module/0, drain/1, revive/0, queue_vm_stats_sups/0, @@ -1354,7 +1352,9 @@ capabilities() -> %% AMQP property filter expressions %% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227 amqp_capabilities => [<<"AMQP_FILTEX_PROP_V1_0">>], - server_named => false}. + server_named => false, + rebalance_module => ?MODULE, + can_redeliver => true}. notify_decorators(Q) when ?is_amqqueue(Q) -> %% Not supported @@ -1447,18 +1447,12 @@ queue_topology(Q) -> policy_apply_to_name() -> <<"streams">>. -can_redeliver() -> - true. - stop(_VHost) -> ok. is_replicated() -> true. -rebalance_module() -> - ?MODULE. - drain(TransferCandidates) -> case whereis(rabbit_stream_coordinator) of undefined -> ok;