implement recover and arguments for custom queue types

Call the registry for the list of available queue types
and modules instead using only the hardcoded values.
This commit is contained in:
Lajos Gerecs 2023-07-20 14:16:03 +02:00
parent 7d7cb824d0
commit afcbde0cdc
2 changed files with 22 additions and 13 deletions

View File

@ -1094,22 +1094,15 @@ check_queue_version(Val, Args) ->
Error -> Error
end.
-define(KNOWN_QUEUE_TYPES, [<<"classic">>, <<"quorum">>, <<"stream">>]).
known_queue_types() ->
Registered = rabbit_registry:lookup_all(queue),
{QueueTypes, _} = lists:unzip(Registered),
QTypeBins = lists:map(fun(X) -> atom_to_binary(X) end, QueueTypes),
?KNOWN_QUEUE_TYPES ++ QTypeBins.
check_queue_type({longstr, Val}, _Args) ->
case lists:member(Val, known_queue_types()) of
case lists:member(Val, rabbit_queue_type:known_queue_type_names()) of
true -> ok;
false -> {error, rabbit_misc:format("unsupported queue type '~ts'", [Val])}
end;
check_queue_type({Type, _}, _Args) ->
{error, {unacceptable_type, Type}};
check_queue_type(Val, _Args) when is_binary(Val) ->
case lists:member(Val, known_queue_types()) of
case lists:member(Val, rabbit_queue_type:known_queue_type_names()) of
true -> ok;
false -> {error, rabbit_misc:format("unsupported queue type '~ts'", [Val])}
end;

View File

@ -54,7 +54,9 @@
-export([
added_to_rabbit_registry/2,
removed_from_rabbit_registry/1
removed_from_rabbit_registry/1,
known_queue_type_names/0,
known_queue_type_modules/0
]).
-type queue_name() :: rabbit_amqqueue:name().
@ -72,7 +74,8 @@
-define(DOWN_KEYS, [name, durable, auto_delete, arguments, pid, recoverable_slaves, type, state]).
%% TODO resolve all registered queue types from registry
-define(QUEUE_TYPES, [rabbit_classic_queue, rabbit_quorum_queue, rabbit_stream_queue]).
-define(QUEUE_MODULES, [rabbit_classic_queue, rabbit_quorum_queue, rabbit_stream_queue]).
-define(KNOWN_QUEUE_TYPES, [<<"classic">>, <<"quorum">>, <<"stream">>]).
%% anything that the host process needs to do on behalf of the queue type session
-type action() ::
@ -366,7 +369,7 @@ is_server_named_allowed(Type) ->
arguments(ArgumentType) ->
Args0 = lists:map(fun(T) ->
maps:get(ArgumentType, T:capabilities(), [])
end, ?QUEUE_TYPES),
end, known_queue_type_modules()),
Args = lists:flatten(Args0),
lists:usort(Args).
@ -435,7 +438,7 @@ is_recoverable(Q) ->
{Recovered :: [amqqueue:amqqueue()],
Failed :: [amqqueue:amqqueue()]}.
recover(VHost, Qs) ->
ByType0 = maps:from_keys(?QUEUE_TYPES, []),
ByType0 = maps:from_keys(known_queue_type_modules(), []),
ByType = lists:foldl(
fun (Q, Acc) ->
T = amqqueue:get_type(Q),
@ -684,3 +687,16 @@ qref(#resource{kind = queue} = QName) ->
QName;
qref(Q) when ?is_amqqueue(Q) ->
amqqueue:get_name(Q).
-spec known_queue_type_modules() -> [module()].
known_queue_type_modules() ->
Registered = rabbit_registry:lookup_all(queue),
{_, Modules} = lists:unzip(Registered),
?QUEUE_MODULES ++ Modules.
-spec known_queue_type_names() -> [binary()].
known_queue_type_names() ->
Registered = rabbit_registry:lookup_all(queue),
{QueueTypes, _} = lists:unzip(Registered),
QTypeBins = lists:map(fun(X) -> atom_to_binary(X) end, QueueTypes),
?KNOWN_QUEUE_TYPES ++ QTypeBins.