Merge pull request #5305 from rabbitmq/default-queue-type-per-vhost
Configure default queue type by vhost
This commit is contained in:
		
						commit
						4cacec6bfd
					
				| 
						 | 
				
			
			@ -17,6 +17,7 @@
 | 
			
		|||
         not_found_or_absent/1, not_found_or_absent_dirty/1,
 | 
			
		||||
         with/2, with/3, with_or_die/2,
 | 
			
		||||
         assert_equivalence/5,
 | 
			
		||||
         augment_declare_args/5,
 | 
			
		||||
         check_exclusive_access/2, with_exclusive_access_or_die/3,
 | 
			
		||||
         stat/1, deliver/2,
 | 
			
		||||
         requeue/3, ack/3, reject/4]).
 | 
			
		||||
| 
						 | 
				
			
			@ -729,6 +730,36 @@ assert_equivalence(Q, DurableDeclare, AutoDeleteDeclare, Args1, Owner) ->
 | 
			
		|||
    ok = rabbit_misc:assert_field_equivalence(AutoDeleteQ, AutoDeleteDeclare, QName, auto_delete),
 | 
			
		||||
    ok = assert_args_equivalence(Q, Args1).
 | 
			
		||||
 | 
			
		||||
-spec augment_declare_args(vhost:name(), boolean(),
 | 
			
		||||
                           boolean(), boolean(),
 | 
			
		||||
                           rabbit_framing:amqp_table()) ->
 | 
			
		||||
    rabbit_framing:amqp_table().
 | 
			
		||||
augment_declare_args(VHost, Durable, Exclusive, AutoDelete, Args0) ->
 | 
			
		||||
    V = rabbit_vhost:lookup(VHost),
 | 
			
		||||
    HasQTypeArg = rabbit_misc:table_lookup(Args0, <<"x-queue-type">>) =/= undefined,
 | 
			
		||||
    case vhost:get_metadata(V) of
 | 
			
		||||
        #{default_queue_type := DefaultQueueType}
 | 
			
		||||
          when is_binary(DefaultQueueType) andalso
 | 
			
		||||
               not HasQTypeArg ->
 | 
			
		||||
            Type = rabbit_queue_type:discover(DefaultQueueType),
 | 
			
		||||
            case rabbit_queue_type:is_compatible(Type, Durable,
 | 
			
		||||
                                                 Exclusive, AutoDelete) of
 | 
			
		||||
                true ->
 | 
			
		||||
                    %% patch up declare arguments with x-queue-type if there
 | 
			
		||||
                    %% is a vhost default set the queue is druable and not exclusive
 | 
			
		||||
                    %% and there is no queue type argument
 | 
			
		||||
                    %% present
 | 
			
		||||
                    rabbit_misc:set_table_value(Args0,
 | 
			
		||||
                                                <<"x-queue-type">>,
 | 
			
		||||
                                                longstr,
 | 
			
		||||
                                                DefaultQueueType);
 | 
			
		||||
                false ->
 | 
			
		||||
                    Args0
 | 
			
		||||
            end;
 | 
			
		||||
        _ ->
 | 
			
		||||
            Args0
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
-spec check_exclusive_access(amqqueue:amqqueue(), pid()) ->
 | 
			
		||||
          'ok' | rabbit_types:channel_exit().
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2519,13 +2519,18 @@ handle_method(#'queue.declare'{queue       = QueueNameBin,
 | 
			
		|||
                               exclusive   = ExclusiveDeclare,
 | 
			
		||||
                               auto_delete = AutoDelete,
 | 
			
		||||
                               nowait      = NoWait,
 | 
			
		||||
                               arguments   = Args} = Declare,
 | 
			
		||||
                               arguments   = Args0} = Declare,
 | 
			
		||||
              ConnPid, AuthzContext, CollectorPid, VHostPath,
 | 
			
		||||
              #user{username = Username} = User) ->
 | 
			
		||||
    Owner = case ExclusiveDeclare of
 | 
			
		||||
                true  -> ConnPid;
 | 
			
		||||
                false -> none
 | 
			
		||||
            end,
 | 
			
		||||
    Args = rabbit_amqqueue:augment_declare_args(VHostPath,
 | 
			
		||||
                                                DurableDeclare,
 | 
			
		||||
                                                ExclusiveDeclare,
 | 
			
		||||
                                                AutoDelete,
 | 
			
		||||
                                                Args0),
 | 
			
		||||
    StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
 | 
			
		||||
    Durable = DurableDeclare andalso not ExclusiveDeclare,
 | 
			
		||||
    ActualNameBin = case StrippedQueueNameBin of
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -20,6 +20,7 @@
 | 
			
		|||
 | 
			
		||||
-export([
 | 
			
		||||
         is_enabled/0,
 | 
			
		||||
         is_compatible/3,
 | 
			
		||||
         declare/2,
 | 
			
		||||
         delete/4,
 | 
			
		||||
         is_recoverable/1,
 | 
			
		||||
| 
						 | 
				
			
			@ -53,6 +54,10 @@
 | 
			
		|||
 | 
			
		||||
is_enabled() -> true.
 | 
			
		||||
 | 
			
		||||
-spec is_compatible(boolean(), boolean(), boolean()) -> boolean().
 | 
			
		||||
is_compatible(_, _, _) ->
 | 
			
		||||
    true.
 | 
			
		||||
 | 
			
		||||
declare(Q, Node) when ?amqqueue_is_classic(Q) ->
 | 
			
		||||
    QName = amqqueue:get_name(Q),
 | 
			
		||||
    VHost = amqqueue:get_vhost(Q),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -15,6 +15,7 @@
 | 
			
		|||
         discover/1,
 | 
			
		||||
         default/0,
 | 
			
		||||
         is_enabled/1,
 | 
			
		||||
         is_compatible/4,
 | 
			
		||||
         declare/2,
 | 
			
		||||
         delete/4,
 | 
			
		||||
         is_recoverable/1,
 | 
			
		||||
| 
						 | 
				
			
			@ -127,6 +128,11 @@
 | 
			
		|||
%% is the queue type feature enabled
 | 
			
		||||
-callback is_enabled() -> boolean().
 | 
			
		||||
 | 
			
		||||
-callback is_compatible(Durable :: boolean(),
 | 
			
		||||
                        Exclusive :: boolean(),
 | 
			
		||||
                        AutoDelete :: boolean()) ->
 | 
			
		||||
    boolean().
 | 
			
		||||
 | 
			
		||||
-callback declare(amqqueue:amqqueue(), node()) ->
 | 
			
		||||
    {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
 | 
			
		||||
    {'absent', amqqueue:amqqueue(), absent_reason()} |
 | 
			
		||||
| 
						 | 
				
			
			@ -232,6 +238,11 @@ default() ->
 | 
			
		|||
is_enabled(Type) ->
 | 
			
		||||
    Type:is_enabled().
 | 
			
		||||
 | 
			
		||||
-spec is_compatible(module(), boolean(), boolean(), boolean()) ->
 | 
			
		||||
    boolean().
 | 
			
		||||
is_compatible(Type, Durable, Exclusive, AutoDelete) ->
 | 
			
		||||
    Type:is_compatible(Durable, Exclusive, AutoDelete).
 | 
			
		||||
 | 
			
		||||
-spec declare(amqqueue:amqqueue(), node()) ->
 | 
			
		||||
    {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
 | 
			
		||||
    {'absent', amqqueue:amqqueue(), absent_reason()} |
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -64,6 +64,7 @@
 | 
			
		|||
         spawn_notify_decorators/3]).
 | 
			
		||||
 | 
			
		||||
-export([is_enabled/0,
 | 
			
		||||
         is_compatible/3,
 | 
			
		||||
         declare/2]).
 | 
			
		||||
 | 
			
		||||
-import(rabbit_queue_type_util, [args_policy_lookup/3,
 | 
			
		||||
| 
						 | 
				
			
			@ -116,7 +117,13 @@
 | 
			
		|||
is_enabled() ->
 | 
			
		||||
    rabbit_feature_flags:is_enabled(quorum_queue).
 | 
			
		||||
 | 
			
		||||
%%----------------------------------------------------------------------------
 | 
			
		||||
-spec is_compatible(boolean(), boolean(), boolean()) -> boolean().
 | 
			
		||||
is_compatible(_Durable = true,
 | 
			
		||||
              _Exclusive = false,
 | 
			
		||||
              _AutoDelete = false) ->
 | 
			
		||||
    true;
 | 
			
		||||
is_compatible(_, _, _) ->
 | 
			
		||||
    false.
 | 
			
		||||
 | 
			
		||||
-spec init(amqqueue:amqqueue()) -> {ok, rabbit_fifo_client:state()}.
 | 
			
		||||
init(Q) when ?is_amqqueue(Q) ->
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -10,6 +10,7 @@
 | 
			
		|||
-behaviour(rabbit_queue_type).
 | 
			
		||||
 | 
			
		||||
-export([is_enabled/0,
 | 
			
		||||
         is_compatible/3,
 | 
			
		||||
         declare/2,
 | 
			
		||||
         delete/4,
 | 
			
		||||
         purge/1,
 | 
			
		||||
| 
						 | 
				
			
			@ -87,6 +88,15 @@
 | 
			
		|||
is_enabled() ->
 | 
			
		||||
    rabbit_feature_flags:is_enabled(stream_queue).
 | 
			
		||||
 | 
			
		||||
-spec is_compatible(boolean(), boolean(), boolean()) -> boolean().
 | 
			
		||||
is_compatible(_Durable = true,
 | 
			
		||||
              _Exclusive = false,
 | 
			
		||||
              _AutoDelete = false) ->
 | 
			
		||||
    true;
 | 
			
		||||
is_compatible(_, _, _) ->
 | 
			
		||||
    false.
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
-spec declare(amqqueue:amqqueue(), node()) ->
 | 
			
		||||
    {'new' | 'existing', amqqueue:amqqueue()} |
 | 
			
		||||
    {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -11,7 +11,7 @@
 | 
			
		|||
-include("vhost.hrl").
 | 
			
		||||
 | 
			
		||||
-export([recover/0, recover/1, read_config/1]).
 | 
			
		||||
-export([add/2, add/4, delete/2, exists/1, with/2, with_user_and_vhost/3, assert/1, update/2,
 | 
			
		||||
-export([add/2, add/3, add/4, delete/2, exists/1, with/2, with_user_and_vhost/3, assert/1, update/2,
 | 
			
		||||
         set_limits/2, vhost_cluster_state/1, is_running_on_all_nodes/1, await_running_on_all_nodes/2,
 | 
			
		||||
        list/0, count/0, list_names/0, all/0, all_tagged_with/1]).
 | 
			
		||||
-export([parse_tags/1, update_metadata/2, tag_with/2, untag_from/2, update_tags/2, update_tags/3]).
 | 
			
		||||
| 
						 | 
				
			
			@ -20,7 +20,8 @@
 | 
			
		|||
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0, config_file_path/1, ensure_config_file/1]).
 | 
			
		||||
-export([delete_storage/1]).
 | 
			
		||||
-export([vhost_down/1]).
 | 
			
		||||
-export([put_vhost/5]).
 | 
			
		||||
-export([put_vhost/5,
 | 
			
		||||
         put_vhost/6]).
 | 
			
		||||
 | 
			
		||||
%%
 | 
			
		||||
%% API
 | 
			
		||||
| 
						 | 
				
			
			@ -138,34 +139,55 @@ parse_tags(Val) when is_list(Val) ->
 | 
			
		|||
        [trim_tag(Tag) || Tag <- re:split(Val, ",", [{return, list}])]
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
-spec add(vhost:name(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
 | 
			
		||||
 | 
			
		||||
-spec add(vhost:name(), rabbit_types:username()) ->
 | 
			
		||||
    rabbit_types:ok_or_error(any()).
 | 
			
		||||
add(VHost, ActingUser) ->
 | 
			
		||||
    case exists(VHost) of
 | 
			
		||||
        true  -> ok;
 | 
			
		||||
        false -> do_add(VHost, <<"">>, [], ActingUser)
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
-spec add(vhost:name(), binary(), [atom()], rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
 | 
			
		||||
    add(VHost, #{}, ActingUser).
 | 
			
		||||
 | 
			
		||||
-spec add(vhost:name(), binary(), [atom()], rabbit_types:username()) ->
 | 
			
		||||
    rabbit_types:ok_or_error(any()).
 | 
			
		||||
add(Name, Description, Tags, ActingUser) ->
 | 
			
		||||
    add(Name, #{description => Description,
 | 
			
		||||
                tags => Tags}, ActingUser).
 | 
			
		||||
 | 
			
		||||
-spec add(vhost:name(), vhost:metadata(), rabbit_types:username()) ->
 | 
			
		||||
    rabbit_types:ok_or_error(any()).
 | 
			
		||||
add(Name, Metadata, ActingUser) ->
 | 
			
		||||
    case exists(Name) of
 | 
			
		||||
        true  -> ok;
 | 
			
		||||
        false -> do_add(Name, Description, Tags, ActingUser)
 | 
			
		||||
        false ->
 | 
			
		||||
            catch(do_add(Name, Metadata, ActingUser))
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
do_add(Name, Description, Tags, ActingUser) ->
 | 
			
		||||
do_add(Name, Metadata, ActingUser) ->
 | 
			
		||||
    Description = maps:get(description, Metadata, undefined),
 | 
			
		||||
    Tags = maps:get(tags, Metadata, []),
 | 
			
		||||
 | 
			
		||||
    %% validate default_queue_type
 | 
			
		||||
    case Metadata of
 | 
			
		||||
        #{default_queue_type := DQT} ->
 | 
			
		||||
            try rabbit_queue_type:discover(DQT) of
 | 
			
		||||
                _ ->
 | 
			
		||||
                    ok
 | 
			
		||||
            catch _:_ ->
 | 
			
		||||
                      throw({error, invalid_queue_type})
 | 
			
		||||
            end;
 | 
			
		||||
        _ ->
 | 
			
		||||
            ok
 | 
			
		||||
    end,
 | 
			
		||||
 | 
			
		||||
    case Description of
 | 
			
		||||
        undefined ->
 | 
			
		||||
            rabbit_log:info("Adding vhost '~s' without a description", [Name]);
 | 
			
		||||
        Value ->
 | 
			
		||||
            rabbit_log:info("Adding vhost '~s' (description: '~s', tags: ~p)", [Name, Value, Tags])
 | 
			
		||||
        Description ->
 | 
			
		||||
            rabbit_log:info("Adding vhost '~s' (description: '~s', tags: ~p)",
 | 
			
		||||
                            [Name, Description, Tags])
 | 
			
		||||
    end,
 | 
			
		||||
    VHost = rabbit_misc:execute_mnesia_transaction(
 | 
			
		||||
          fun () ->
 | 
			
		||||
                  case mnesia:wread({rabbit_vhost, Name}) of
 | 
			
		||||
                      [] ->
 | 
			
		||||
                        Row = vhost:new(Name, [], #{description => Description, tags => Tags}),
 | 
			
		||||
                        Row = vhost:new(Name, [], Metadata),
 | 
			
		||||
                        rabbit_log:debug("Inserting a virtual host record ~p", [Row]),
 | 
			
		||||
                        ok = mnesia:write(rabbit_vhost, Row, write),
 | 
			
		||||
                        Row;
 | 
			
		||||
| 
						 | 
				
			
			@ -259,6 +281,9 @@ delete(VHost, ActingUser) ->
 | 
			
		|||
    ok.
 | 
			
		||||
 | 
			
		||||
put_vhost(Name, Description, Tags0, Trace, Username) ->
 | 
			
		||||
    put_vhost(Name, Description, Tags0, undefined, Trace, Username).
 | 
			
		||||
 | 
			
		||||
put_vhost(Name, Description, Tags0, DefaultQueueType, Trace, Username) ->
 | 
			
		||||
    Tags = case Tags0 of
 | 
			
		||||
      undefined   -> <<"">>;
 | 
			
		||||
      null        -> <<"">>;
 | 
			
		||||
| 
						 | 
				
			
			@ -269,19 +294,32 @@ put_vhost(Name, Description, Tags0, Trace, Username) ->
 | 
			
		|||
    ParsedTags = parse_tags(Tags),
 | 
			
		||||
    rabbit_log:debug("Parsed tags ~p to ~p", [Tags, ParsedTags]),
 | 
			
		||||
    Result = case exists(Name) of
 | 
			
		||||
        true  ->
 | 
			
		||||
            update(Name, Description, ParsedTags, Username);
 | 
			
		||||
        false ->
 | 
			
		||||
            add(Name, Description, ParsedTags, Username),
 | 
			
		||||
             %% wait for up to 45 seconds for the vhost to initialise
 | 
			
		||||
             %% on all nodes
 | 
			
		||||
             case await_running_on_all_nodes(Name, 45000) of
 | 
			
		||||
                 ok               ->
 | 
			
		||||
                     maybe_grant_full_permissions(Name, Username);
 | 
			
		||||
                 {error, timeout} ->
 | 
			
		||||
                     {error, timeout}
 | 
			
		||||
             end
 | 
			
		||||
    end,
 | 
			
		||||
                 true  ->
 | 
			
		||||
                     update(Name, Description, ParsedTags, Username);
 | 
			
		||||
                 false ->
 | 
			
		||||
                     Metadata0 = #{description => Description,
 | 
			
		||||
                                   tags => ParsedTags},
 | 
			
		||||
                     Metadata = case DefaultQueueType of
 | 
			
		||||
                                    undefined ->
 | 
			
		||||
                                        Metadata0;
 | 
			
		||||
                                    _ ->
 | 
			
		||||
                                        Metadata0#{default_queue_type =>
 | 
			
		||||
                                                       DefaultQueueType}
 | 
			
		||||
                                end,
 | 
			
		||||
                     case add(Name, Metadata, Username) of
 | 
			
		||||
                         ok ->
 | 
			
		||||
                             %% wait for up to 45 seconds for the vhost to initialise
 | 
			
		||||
                             %% on all nodes
 | 
			
		||||
                             case await_running_on_all_nodes(Name, 45000) of
 | 
			
		||||
                                 ok               ->
 | 
			
		||||
                                     maybe_grant_full_permissions(Name, Username);
 | 
			
		||||
                                 {error, timeout} ->
 | 
			
		||||
                                     {error, timeout}
 | 
			
		||||
                             end;
 | 
			
		||||
                         Err ->
 | 
			
		||||
                             Err
 | 
			
		||||
                     end
 | 
			
		||||
             end,
 | 
			
		||||
    case Trace of
 | 
			
		||||
        true      -> rabbit_trace:start(Name);
 | 
			
		||||
        false     -> rabbit_trace:stop(Name);
 | 
			
		||||
| 
						 | 
				
			
			@ -492,20 +530,20 @@ update_tags(VHostName, Tags) ->
 | 
			
		|||
    ConvertedTags = [rabbit_data_coercion:to_atom(I) || I <- Tags],
 | 
			
		||||
    update(VHostName, fun(Record) ->
 | 
			
		||||
        Meta0 = vhost:get_metadata(Record),
 | 
			
		||||
        Meta  = maps:update(tags, ConvertedTags, Meta0),
 | 
			
		||||
        Meta  = maps:put(tags, ConvertedTags, Meta0),
 | 
			
		||||
        vhost:set_metadata(Record, Meta)
 | 
			
		||||
    end).
 | 
			
		||||
 | 
			
		||||
-spec tag_with(vhost:name(), [atom()]) -> vhost:vhost() | rabbit_types:ok_or_error(any()).
 | 
			
		||||
tag_with(VHostName, Tags) when is_list(Tags) ->
 | 
			
		||||
    update_metadata(VHostName, fun(#{tags := Tags0} = Meta) ->
 | 
			
		||||
        maps:update(tags, lists:usort(Tags0 ++ Tags), Meta)
 | 
			
		||||
        maps:put(tags, lists:usort(Tags0 ++ Tags), Meta)
 | 
			
		||||
    end).
 | 
			
		||||
 | 
			
		||||
-spec untag_from(vhost:name(), [atom()]) -> vhost:vhost() | rabbit_types:ok_or_error(any()).
 | 
			
		||||
untag_from(VHostName, Tags) when is_list(Tags) ->
 | 
			
		||||
    update_metadata(VHostName, fun(#{tags := Tags0} = Meta) ->
 | 
			
		||||
        maps:update(tags, lists:usort(Tags0 -- Tags), Meta)
 | 
			
		||||
        maps:put(tags, lists:usort(Tags0 -- Tags), Meta)
 | 
			
		||||
    end).
 | 
			
		||||
 | 
			
		||||
set_limits(VHost, undefined) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -546,6 +584,7 @@ i(tracing, VHost) -> rabbit_trace:enabled(vhost:get_name(VHost));
 | 
			
		|||
i(cluster_state, VHost) -> vhost_cluster_state(vhost:get_name(VHost));
 | 
			
		||||
i(description, VHost) -> vhost:get_description(VHost);
 | 
			
		||||
i(tags, VHost) -> vhost:get_tags(VHost);
 | 
			
		||||
i(default_queue_type, VHost) -> vhost:get_default_queue_type(VHost);
 | 
			
		||||
i(metadata, VHost) -> vhost:get_metadata(VHost);
 | 
			
		||||
i(Item, VHost)     ->
 | 
			
		||||
  rabbit_log:error("Don't know how to compute a virtual host info item '~s' for virtual host '~p'", [Item, VHost]),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -25,6 +25,7 @@
 | 
			
		|||
  get_metadata/1,
 | 
			
		||||
  get_description/1,
 | 
			
		||||
  get_tags/1,
 | 
			
		||||
  get_default_queue_type/1,
 | 
			
		||||
  set_limits/2,
 | 
			
		||||
  set_metadata/2,
 | 
			
		||||
  merge_metadata/2,
 | 
			
		||||
| 
						 | 
				
			
			@ -130,8 +131,16 @@ info_keys() ->
 | 
			
		|||
    case record_version_to_use() of
 | 
			
		||||
        %% note: this reports description and tags separately even though
 | 
			
		||||
        %% they are stored in the metadata map. MK.
 | 
			
		||||
        ?record_version -> [name, description, tags, metadata, tracing, cluster_state];
 | 
			
		||||
        _               -> vhost_v1:info_keys()
 | 
			
		||||
        ?record_version ->
 | 
			
		||||
            [name,
 | 
			
		||||
             description,
 | 
			
		||||
             tags,
 | 
			
		||||
             default_queue_type,
 | 
			
		||||
             metadata,
 | 
			
		||||
             tracing,
 | 
			
		||||
             cluster_state];
 | 
			
		||||
        _ ->
 | 
			
		||||
            vhost_v1:info_keys()
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
-spec pattern_match_all() -> vhost_pattern().
 | 
			
		||||
| 
						 | 
				
			
			@ -166,6 +175,12 @@ get_tags(#vhost{} = VHost) ->
 | 
			
		|||
get_tags(VHost) ->
 | 
			
		||||
    vhost_v1:get_tags(VHost).
 | 
			
		||||
 | 
			
		||||
-spec get_default_queue_type(vhost()) -> binary() | undefined.
 | 
			
		||||
get_default_queue_type(#vhost{} = VHost) ->
 | 
			
		||||
    maps:get(default_queue_type, get_metadata(VHost), undefined);
 | 
			
		||||
get_default_queue_type(_VHost) ->
 | 
			
		||||
    undefined.
 | 
			
		||||
 | 
			
		||||
set_limits(VHost, Value) ->
 | 
			
		||||
    case record_version_to_use() of
 | 
			
		||||
      ?record_version ->
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -125,6 +125,7 @@ all_tests() ->
 | 
			
		|||
     cancel_sync_queue,
 | 
			
		||||
     idempotent_recover,
 | 
			
		||||
     vhost_with_quorum_queue_is_deleted,
 | 
			
		||||
     vhost_with_default_queue_type_declares_quorum_queue,
 | 
			
		||||
     delete_immediately_by_resource,
 | 
			
		||||
     consume_redelivery_count,
 | 
			
		||||
     subscribe_redelivery_count,
 | 
			
		||||
| 
						 | 
				
			
			@ -223,6 +224,8 @@ init_per_group(Group, Config) ->
 | 
			
		|||
                            timer:sleep(ClusterSize * 1000),
 | 
			
		||||
                            ok = rabbit_ct_broker_helpers:enable_feature_flag(
 | 
			
		||||
                                   Config2, maintenance_mode_status),
 | 
			
		||||
                            ok = rabbit_ct_broker_helpers:enable_feature_flag(
 | 
			
		||||
                                   Config2, virtual_host_metadata),
 | 
			
		||||
                            Config2;
 | 
			
		||||
                        Skip ->
 | 
			
		||||
                            end_per_group(Group, Config2),
 | 
			
		||||
| 
						 | 
				
			
			@ -705,6 +708,61 @@ vhost_with_quorum_queue_is_deleted(Config) ->
 | 
			
		|||
    undefined = rpc:call(Node, ra_directory, where_is, [quorum_queues, RaName]),
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
vhost_with_default_queue_type_declares_quorum_queue(Config) ->
 | 
			
		||||
    Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
 | 
			
		||||
    VHost = atom_to_binary(?FUNCTION_NAME, utf8),
 | 
			
		||||
    QName = atom_to_binary(?FUNCTION_NAME, utf8),
 | 
			
		||||
    User = ?config(rmq_username, Config),
 | 
			
		||||
 | 
			
		||||
    AddVhostArgs = [VHost, #{default_queue_type => <<"quorum">>}, User],
 | 
			
		||||
    ok = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_vhost, add,
 | 
			
		||||
                                      AddVhostArgs),
 | 
			
		||||
    ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost),
 | 
			
		||||
    Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node, VHost),
 | 
			
		||||
    {ok, Ch} = amqp_connection:open_channel(Conn),
 | 
			
		||||
    ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [])),
 | 
			
		||||
    assert_queue_type(Node, VHost, QName, rabbit_quorum_queue),
 | 
			
		||||
    %% declaring again without a queue arg is ok
 | 
			
		||||
    ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [])),
 | 
			
		||||
    %% also using an explicit queue type should be ok
 | 
			
		||||
    ?assertEqual({'queue.declare_ok', QName, 0, 0},
 | 
			
		||||
                 declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
 | 
			
		||||
    %% passive should work without x-queue-type
 | 
			
		||||
    ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare_passive(Ch, QName, [])),
 | 
			
		||||
    %% passive with x-queue-type also should work
 | 
			
		||||
    ?assertEqual({'queue.declare_ok', QName, 0, 0},
 | 
			
		||||
                 declare_passive(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
 | 
			
		||||
 | 
			
		||||
    %% declaring an exclusive queue should declare a classic queue
 | 
			
		||||
    QNameEx = iolist_to_binary([QName, <<"_exclusive">>]),
 | 
			
		||||
    ?assertEqual({'queue.declare_ok', QNameEx, 0, 0},
 | 
			
		||||
                 amqp_channel:call(Ch, #'queue.declare'{queue = QNameEx,
 | 
			
		||||
                                                        exclusive = true,
 | 
			
		||||
                                                        durable = true,
 | 
			
		||||
                                                        arguments = []})),
 | 
			
		||||
    assert_queue_type(Node, VHost, QNameEx, rabbit_classic_queue),
 | 
			
		||||
 | 
			
		||||
    %% transient declares should also fall back to classic queues
 | 
			
		||||
    QNameTr = iolist_to_binary([QName, <<"_transient">>]),
 | 
			
		||||
    ?assertEqual({'queue.declare_ok', QNameTr, 0, 0},
 | 
			
		||||
                 amqp_channel:call(Ch, #'queue.declare'{queue = QNameTr,
 | 
			
		||||
                                                        exclusive = false,
 | 
			
		||||
                                                        durable = false,
 | 
			
		||||
                                                        arguments = []})),
 | 
			
		||||
    assert_queue_type(Node, VHost, QNameTr, rabbit_classic_queue),
 | 
			
		||||
 | 
			
		||||
    %% auto-delete declares should also fall back to classic queues
 | 
			
		||||
    QNameAd = iolist_to_binary([QName, <<"_delete">>]),
 | 
			
		||||
    ?assertEqual({'queue.declare_ok', QNameAd, 0, 0},
 | 
			
		||||
                 amqp_channel:call(Ch, #'queue.declare'{queue = QNameAd,
 | 
			
		||||
                                                        exclusive = false,
 | 
			
		||||
                                                        auto_delete = true,
 | 
			
		||||
                                                        durable = true,
 | 
			
		||||
                                                        arguments = []})),
 | 
			
		||||
    assert_queue_type(Node, VHost, QNameAd, rabbit_classic_queue),
 | 
			
		||||
    amqp_connection:close(Conn),
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
restart_all_types(Config) ->
 | 
			
		||||
    %% Test the node restart with both types of queues (quorum and classic) to
 | 
			
		||||
    %% ensure there are no regressions
 | 
			
		||||
| 
						 | 
				
			
			@ -2938,12 +2996,21 @@ declare(Ch, Q, Args) ->
 | 
			
		|||
                                           auto_delete = false,
 | 
			
		||||
                                           arguments = Args}).
 | 
			
		||||
 | 
			
		||||
declare_passive(Ch, Q, Args) ->
 | 
			
		||||
    amqp_channel:call(Ch, #'queue.declare'{queue = Q,
 | 
			
		||||
                                           durable = true,
 | 
			
		||||
                                           auto_delete = false,
 | 
			
		||||
                                           passive = true,
 | 
			
		||||
                                           arguments = Args}).
 | 
			
		||||
assert_queue_type(Server, Q, Expected) ->
 | 
			
		||||
    Actual = get_queue_type(Server, Q),
 | 
			
		||||
    assert_queue_type(Server, <<"/">>, Q, Expected).
 | 
			
		||||
 | 
			
		||||
assert_queue_type(Server, VHost, Q, Expected) ->
 | 
			
		||||
    Actual = get_queue_type(Server, VHost, Q),
 | 
			
		||||
    Expected = Actual.
 | 
			
		||||
 | 
			
		||||
get_queue_type(Server, Q0) ->
 | 
			
		||||
    QNameRes = rabbit_misc:r(<<"/">>, queue, Q0),
 | 
			
		||||
get_queue_type(Server, VHost, Q0) ->
 | 
			
		||||
    QNameRes = rabbit_misc:r(VHost, queue, Q0),
 | 
			
		||||
    {ok, Q1} = rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]),
 | 
			
		||||
    amqqueue:get_type(Q1).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -10,7 +10,8 @@ defmodule RabbitMQ.CLI.Ctl.Commands.AddVhostCommand do
 | 
			
		|||
  @behaviour RabbitMQ.CLI.CommandBehaviour
 | 
			
		||||
 | 
			
		||||
  def switches(), do: [description: :string,
 | 
			
		||||
                       tags: :string]
 | 
			
		||||
                       tags: :string,
 | 
			
		||||
                       default_queue_type: :string]
 | 
			
		||||
  def aliases(), do: [d: :description]
 | 
			
		||||
 | 
			
		||||
  def merge_defaults(args, opts) do
 | 
			
		||||
| 
						 | 
				
			
			@ -19,11 +20,14 @@ defmodule RabbitMQ.CLI.Ctl.Commands.AddVhostCommand do
 | 
			
		|||
  use RabbitMQ.CLI.Core.AcceptsOnePositionalArgument
 | 
			
		||||
  use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
 | 
			
		||||
 | 
			
		||||
  def run([vhost], %{node: node_name, description: desc, tags: tags}) do
 | 
			
		||||
    :rabbit_misc.rpc_call(node_name, :rabbit_vhost, :add, [vhost, desc, parse_tags(tags), Helpers.cli_acting_user()])
 | 
			
		||||
  def run([vhost], %{node: node_name, description: desc, tags: tags, default_queue_type: default_qt}) do
 | 
			
		||||
    meta = %{description: desc,
 | 
			
		||||
             tags: parse_tags(tags),
 | 
			
		||||
             default_queue_type: default_qt}
 | 
			
		||||
    :rabbit_misc.rpc_call(node_name, :rabbit_vhost, :add, [vhost, meta, Helpers.cli_acting_user()])
 | 
			
		||||
  end
 | 
			
		||||
  def run([vhost], %{node: node_name, tags: tags}) do
 | 
			
		||||
    :rabbit_misc.rpc_call(node_name, :rabbit_vhost, :add, [vhost, "", parse_tags(tags), Helpers.cli_acting_user()])
 | 
			
		||||
  def run([vhost], %{node: node_name, description: desc, tags: tags}) do
 | 
			
		||||
    :rabbit_misc.rpc_call(node_name, :rabbit_vhost, :add, [vhost, description, tags, Helpers.cli_acting_user()])
 | 
			
		||||
  end
 | 
			
		||||
  def run([vhost], %{node: node_name}) do
 | 
			
		||||
    :rabbit_misc.rpc_call(node_name, :rabbit_vhost, :add, [vhost, Helpers.cli_acting_user()])
 | 
			
		||||
| 
						 | 
				
			
			@ -31,13 +35,14 @@ defmodule RabbitMQ.CLI.Ctl.Commands.AddVhostCommand do
 | 
			
		|||
 | 
			
		||||
  use RabbitMQ.CLI.DefaultOutput
 | 
			
		||||
 | 
			
		||||
  def usage, do: "add_vhost <vhost> [--description <description> --tags \"<tag1>,<tag2>,<...>\"]"
 | 
			
		||||
  def usage, do: "add_vhost <vhost> [--description <description> --tags \"<tag1>,<tag2>,<...>\" --default-queue-type <quorum|classic|stream>]"
 | 
			
		||||
 | 
			
		||||
  def usage_additional() do
 | 
			
		||||
    [
 | 
			
		||||
      ["<vhost>", "Virtual host name"],
 | 
			
		||||
      ["--description <description>", "Virtual host description"],
 | 
			
		||||
      ["--tags <tags>", "Command separated list of tags"]
 | 
			
		||||
      ["--tags <tags>", "Command separated list of tags"],
 | 
			
		||||
      ["--default-queue-type <quorum|classic|stream>", "Queue type to use if no type is explicitly provided by the client"]
 | 
			
		||||
    ]
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -11,7 +11,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListVhostsCommand do
 | 
			
		|||
  @behaviour RabbitMQ.CLI.CommandBehaviour
 | 
			
		||||
  use RabbitMQ.CLI.DefaultOutput
 | 
			
		||||
 | 
			
		||||
  @info_keys ~w(name description tags tracing cluster_state)a
 | 
			
		||||
  @info_keys ~w(name description tags default_queue_type tracing cluster_state)a
 | 
			
		||||
 | 
			
		||||
  def info_keys(), do: @info_keys
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -34,6 +34,7 @@ defmodule AddVhostCommandTest do
 | 
			
		|||
    assert @command.validate(["new-vhost"], %{}) == :ok
 | 
			
		||||
    assert @command.validate(["new-vhost"], %{description: "Used by team A"}) == :ok
 | 
			
		||||
    assert @command.validate(["new-vhost"], %{description: "Used by team A for QA purposes", tags: "qa,team-a"}) == :ok
 | 
			
		||||
    assert @command.validate(["new-vhost"], %{description: "Used by team A for QA purposes", tags: "qa,team-a", default_queue_type: "quorum"}) == :ok
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  @tag vhost: @vhost
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -156,6 +156,17 @@
 | 
			
		|||
          <th><label>Tags:</label></th>
 | 
			
		||||
          <td><input type="text" name="tags"/></td>
 | 
			
		||||
        </tr>
 | 
			
		||||
        <tr>
 | 
			
		||||
          <th><label>Default Queue Type:</label></th>
 | 
			
		||||
          <td>
 | 
			
		||||
            <!-- <select name="queuetype" onchange="select_queue_type(queuetype)"> -->
 | 
			
		||||
            <select name="defaultqueuetype">
 | 
			
		||||
                <option value="classic">Classic</option>
 | 
			
		||||
                <option value="quorum">Quorum</option>
 | 
			
		||||
                <option value="stream">Stream</option>
 | 
			
		||||
            </select>
 | 
			
		||||
          </td>
 | 
			
		||||
        </tr>
 | 
			
		||||
      </table>
 | 
			
		||||
      <input type="submit" value="Add virtual host"/>
 | 
			
		||||
    </form>
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -10,7 +10,7 @@
 | 
			
		|||
-export([init/2, resource_exists/2, to_json/2,
 | 
			
		||||
         content_types_provided/2, content_types_accepted/2,
 | 
			
		||||
         is_authorized/2, allowed_methods/2, accept_content/2,
 | 
			
		||||
         delete_resource/2, id/1, put_vhost/5]).
 | 
			
		||||
         delete_resource/2, id/1, put_vhost/6]).
 | 
			
		||||
-export([variances/2]).
 | 
			
		||||
 | 
			
		||||
-import(rabbit_misc, [pget/2]).
 | 
			
		||||
| 
						 | 
				
			
			@ -63,12 +63,17 @@ accept_content(ReqData0, Context = #context{user = #user{username = Username}})
 | 
			
		|||
        Trace = rabbit_mgmt_util:parse_bool(maps:get(tracing, BodyMap, undefined)),
 | 
			
		||||
        Description = maps:get(description, BodyMap, <<"">>),
 | 
			
		||||
        Tags = maps:get(tags, BodyMap, <<"">>),
 | 
			
		||||
        case put_vhost(Name, Description, Tags, Trace, Username) of
 | 
			
		||||
        DefaultQT = maps:get(defaultqueuetype, BodyMap, undefined),
 | 
			
		||||
        case put_vhost(Name, Description, Tags, DefaultQT, Trace, Username) of
 | 
			
		||||
            ok ->
 | 
			
		||||
                {true, ReqData, Context};
 | 
			
		||||
            {error, timeout} = E ->
 | 
			
		||||
                rabbit_mgmt_util:internal_server_error(
 | 
			
		||||
                  "Timed out while waiting for the vhost to initialise", E,
 | 
			
		||||
                  ReqData0, Context);
 | 
			
		||||
            {error, E} ->
 | 
			
		||||
                rabbit_mgmt_util:internal_server_error(
 | 
			
		||||
                  "Error occured while adding vhost", E,
 | 
			
		||||
                  ReqData0, Context)
 | 
			
		||||
        end
 | 
			
		||||
      end).
 | 
			
		||||
| 
						 | 
				
			
			@ -93,5 +98,5 @@ id(ReqData) ->
 | 
			
		|||
      Value   -> Value
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
put_vhost(Name, Description, Tags, Trace, Username) ->
 | 
			
		||||
    rabbit_vhost:put_vhost(Name, Description, Tags, Trace, Username).
 | 
			
		||||
put_vhost(Name, Description, Tags, DefaultQT, Trace, Username) ->
 | 
			
		||||
    rabbit_vhost:put_vhost(Name, Description, Tags, DefaultQT, Trace, Username).
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue