diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 2783079128..c2f954ccbf 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -427,8 +427,12 @@ maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) -> [Type, VhostSpec, QueueSpec]), Running = rabbit_maintenance:filter_out_drained_nodes_consistent_read(rabbit_nodes:list_running()), NumRunning = length(Running), + TypeModule = case Type of + all -> all; + _ -> rabbit_queue_type:discover(Type) + end, ToRebalance = [Q || Q <- list(), - filter_per_type(Type, Q), + filter_per_type_for_rebalance(TypeModule, Q), is_replicable(Q), is_match(amqqueue:get_vhost(Q), VhostSpec) andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)], @@ -448,14 +452,10 @@ maybe_rebalance(false, _Type, _VhostSpec, _QueueSpec) -> {error, rebalance_in_progress}. %% Stream queues don't yet support rebalance -filter_per_type(all, Q) -> - ?amqqueue_is_quorum(Q) or ?amqqueue_is_stream(Q); -filter_per_type(quorum, Q) -> - ?amqqueue_is_quorum(Q); -filter_per_type(stream, Q) -> - ?amqqueue_is_stream(Q); -filter_per_type(classic, Q) -> - ?amqqueue_is_classic(Q). +filter_per_type_for_rebalance(all, Q) -> + rabbit_queue_type:rebalance_module(Q) /= undefined; +filter_per_type_for_rebalance(TypeModule, Q) -> + ?amqqueue_type_is(Q, TypeModule). %% TODO: note that it can return {error, not_supported}. %% this will result in a badmatch. However that's fine