Merge pull request #14073 from rabbitmq/ik-generalize-rebalance-module-handling

Generalize rebalance module handling
This commit is contained in:
Michael Klishin 2025-06-13 19:36:12 +04:00 committed by GitHub
commit fbcb9fea23
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 9 additions and 9 deletions

View File

@ -427,8 +427,12 @@ maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
[Type, VhostSpec, QueueSpec]),
Running = rabbit_maintenance:filter_out_drained_nodes_consistent_read(rabbit_nodes:list_running()),
NumRunning = length(Running),
TypeModule = case Type of
all -> all;
_ -> rabbit_queue_type:discover(Type)
end,
ToRebalance = [Q || Q <- list(),
filter_per_type(Type, Q),
filter_per_type_for_rebalance(TypeModule, Q),
is_replicable(Q),
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)],
@ -448,14 +452,10 @@ maybe_rebalance(false, _Type, _VhostSpec, _QueueSpec) ->
{error, rebalance_in_progress}.
%% Stream queues don't yet support rebalance
filter_per_type(all, Q) ->
?amqqueue_is_quorum(Q) or ?amqqueue_is_stream(Q);
filter_per_type(quorum, Q) ->
?amqqueue_is_quorum(Q);
filter_per_type(stream, Q) ->
?amqqueue_is_stream(Q);
filter_per_type(classic, Q) ->
?amqqueue_is_classic(Q).
filter_per_type_for_rebalance(all, Q) ->
rabbit_queue_type:rebalance_module(Q) /= undefined;
filter_per_type_for_rebalance(TypeModule, Q) ->
?amqqueue_type_is(Q, TypeModule).
%% TODO: note that it can return {error, not_supported}.
%% this will result in a badmatch. However that's fine