Merge pull request #13854 from rabbitmq/mergify/bp/v4.1.x/pr-13837
Modify default queue type injection logic (backport #13837)
This commit is contained in:
		
						commit
						e569855eee
					
				| 
						 | 
				
			
			@ -1081,12 +1081,10 @@ list_vhosts() ->
 | 
			
		|||
 | 
			
		||||
vhost_definition(VHost) ->
 | 
			
		||||
    Name = vhost:get_name(VHost),
 | 
			
		||||
    DQT = rabbit_queue_type:short_alias_of(rabbit_vhost:default_queue_type(Name)),
 | 
			
		||||
    #{
 | 
			
		||||
        <<"name">> => Name,
 | 
			
		||||
        <<"limits">> => vhost:get_limits(VHost),
 | 
			
		||||
        <<"metadata">> => vhost:get_metadata(VHost),
 | 
			
		||||
        <<"default_queue_type">> => DQT
 | 
			
		||||
        <<"metadata">> => vhost:get_metadata(VHost)
 | 
			
		||||
    }.
 | 
			
		||||
 | 
			
		||||
list_users() ->
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -57,6 +57,38 @@ recover(VHost) ->
 | 
			
		|||
    ok = rabbit_file:ensure_dir(VHostStubFile),
 | 
			
		||||
    ok = file:write_file(VHostStubFile, VHost),
 | 
			
		||||
    ok = ensure_config_file(VHost),
 | 
			
		||||
 | 
			
		||||
    %% in the past, a vhost didn't necessarily have a default queue type
 | 
			
		||||
    %% and queues declared in that vhost defaulted to the type configured
 | 
			
		||||
    %% on the node level (in the config file). Now each vhost has its default
 | 
			
		||||
    %% queue type in the metadata. For vhosts updated from older versions,
 | 
			
		||||
    %% we need to add the default type to the metadata
 | 
			
		||||
    case rabbit_db_vhost:get(VHost) of
 | 
			
		||||
        undefined ->
 | 
			
		||||
            rabbit_log:warning("Cannot check metadata for vhost '~ts' during recovery, record not found.",
 | 
			
		||||
                [VHost]);
 | 
			
		||||
        VHostRecord ->
 | 
			
		||||
            Metadata = vhost:get_metadata(VHostRecord),
 | 
			
		||||
            case maps:is_key(default_queue_type, Metadata) of
 | 
			
		||||
                true ->
 | 
			
		||||
                    rabbit_log:debug("Default queue type for vhost '~ts' is ~p.",
 | 
			
		||||
                        [VHost, maps:get(default_queue_type, Metadata)]),
 | 
			
		||||
                    ok;
 | 
			
		||||
                false ->
 | 
			
		||||
                    DefaultType = rabbit_queue_type:default_alias(),
 | 
			
		||||
                    rabbit_log:info("Setting missing default queue type to '~p' for vhost '~ts'.",
 | 
			
		||||
                        [DefaultType, VHost]),
 | 
			
		||||
                    case rabbit_db_vhost:merge_metadata(VHost, #{default_queue_type => DefaultType}) of
 | 
			
		||||
                        {ok, _UpdatedVHostRecord} ->
 | 
			
		||||
                            ok;
 | 
			
		||||
                        {error, Reason} ->
 | 
			
		||||
                            % Log the error but continue recovery
 | 
			
		||||
                            rabbit_log:warning("Failed to set the default queue type for vhost '~ts': ~p",
 | 
			
		||||
                                [VHost, Reason])
 | 
			
		||||
                    end
 | 
			
		||||
            end
 | 
			
		||||
    end,
 | 
			
		||||
 | 
			
		||||
    {Recovered, Failed} = rabbit_amqqueue:recover(VHost),
 | 
			
		||||
    AllQs = Recovered ++ Failed,
 | 
			
		||||
    QNames = [amqqueue:get_name(Q) || Q <- AllQs],
 | 
			
		||||
| 
						 | 
				
			
			@ -157,8 +189,16 @@ add(Name, Metadata, ActingUser) ->
 | 
			
		|||
            catch(do_add(Name, Metadata, ActingUser))
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
do_add(Name, Metadata, ActingUser) ->
 | 
			
		||||
do_add(Name, Metadata0, ActingUser) ->
 | 
			
		||||
    ok = is_over_vhost_limit(Name),
 | 
			
		||||
 | 
			
		||||
    Metadata = case maps:is_key(default_queue_type, Metadata0) of
 | 
			
		||||
        true ->
 | 
			
		||||
            Metadata0;
 | 
			
		||||
        false ->
 | 
			
		||||
            Metadata0#{default_queue_type => rabbit_queue_type:default_alias()}
 | 
			
		||||
    end,
 | 
			
		||||
 | 
			
		||||
    Description = maps:get(description, Metadata, undefined),
 | 
			
		||||
    Tags = maps:get(tags, Metadata, []),
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -215,7 +215,8 @@ disable_protection_from_deletion(VHost) ->
 | 
			
		|||
-spec new_metadata(binary(), [atom()], rabbit_queue_type:queue_type() | 'undefined') -> metadata().
 | 
			
		||||
new_metadata(Description, Tags, undefined) ->
 | 
			
		||||
    #{description => Description,
 | 
			
		||||
      tags => Tags};
 | 
			
		||||
        default_queue_type => rabbit_queue_type:default_alias(),
 | 
			
		||||
        tags => Tags};
 | 
			
		||||
new_metadata(Description, Tags, DefaultQueueType) ->
 | 
			
		||||
    #{description => Description,
 | 
			
		||||
      tags => Tags,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -27,6 +27,7 @@ all() ->
 | 
			
		|||
groups() ->
 | 
			
		||||
    ClusterSize1Tests = [
 | 
			
		||||
        vhost_is_created_with_default_limits,
 | 
			
		||||
        vhost_is_created_with_default_queue_type,
 | 
			
		||||
        vhost_is_created_with_operator_policies,
 | 
			
		||||
        vhost_is_created_with_default_user,
 | 
			
		||||
        single_node_vhost_deletion_forces_connection_closure,
 | 
			
		||||
| 
						 | 
				
			
			@ -307,13 +308,14 @@ vhost_update_default_queue_type_undefined(Config) ->
 | 
			
		|||
    VHost = <<"update-default_queue_type-with-undefined-test">>,
 | 
			
		||||
    Description = <<"rmqfpas-105 test vhost">>,
 | 
			
		||||
    Tags = [replicate, private],
 | 
			
		||||
    DefaultQueueType = quorum,
 | 
			
		||||
    VhostDefaultQueueType = quorum,
 | 
			
		||||
    NodeDefaultQueueType = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_queue_type, default_alias, []),
 | 
			
		||||
    Trace = false,
 | 
			
		||||
    ActingUser = <<"acting-user">>,
 | 
			
		||||
    try
 | 
			
		||||
        ?assertMatch(ok, rabbit_ct_broker_helpers:add_vhost(Config, VHost)),
 | 
			
		||||
 | 
			
		||||
        PutVhostArgs0 = [VHost, Description, Tags, DefaultQueueType, Trace, ActingUser],
 | 
			
		||||
        PutVhostArgs0 = [VHost, Description, Tags, VhostDefaultQueueType, Trace, ActingUser],
 | 
			
		||||
        ?assertMatch(ok,
 | 
			
		||||
                     rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_vhost, put_vhost, PutVhostArgs0)),
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -322,7 +324,7 @@ vhost_update_default_queue_type_undefined(Config) ->
 | 
			
		|||
                     rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_vhost, put_vhost, PutVhostArgs1)),
 | 
			
		||||
 | 
			
		||||
        V = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_vhost, lookup, [VHost]),
 | 
			
		||||
        ?assertMatch(#{default_queue_type := DefaultQueueType}, vhost:get_metadata(V))
 | 
			
		||||
        ?assertMatch(#{default_queue_type := NodeDefaultQueueType}, vhost:get_metadata(V))
 | 
			
		||||
    after
 | 
			
		||||
        rabbit_ct_broker_helpers:delete_vhost(Config, VHost)
 | 
			
		||||
    end.
 | 
			
		||||
| 
						 | 
				
			
			@ -460,10 +462,37 @@ vhost_is_created_with_default_limits(Config) ->
 | 
			
		|||
        ?assertEqual(ok, rabbit_ct_broker_helpers:add_vhost(Config, VHost)),
 | 
			
		||||
        ?assertEqual(Limits, rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
                                rabbit_vhost_limit, list, [VHost]))
 | 
			
		||||
    after
 | 
			
		||||
        rabbit_ct_broker_helpers:rpc(
 | 
			
		||||
            Config, 0,
 | 
			
		||||
            application, unset_env, [rabbit, default_limits])
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
vhost_is_created_with_default_queue_type(Config) ->
 | 
			
		||||
    VHost = atom_to_binary(?FUNCTION_NAME),
 | 
			
		||||
    QName = atom_to_binary(?FUNCTION_NAME),
 | 
			
		||||
    ?assertEqual(ok, rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
                            application, set_env, [rabbit, default_queue_type, rabbit_quorum_queue])),
 | 
			
		||||
    try
 | 
			
		||||
        ?assertEqual(ok, rabbit_ct_broker_helpers:add_vhost(Config, VHost)),
 | 
			
		||||
        rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost),
 | 
			
		||||
        ?assertEqual(<<"quorum">>, rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
                                rabbit_vhost, default_queue_type, [VHost])),
 | 
			
		||||
        V = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_db_vhost, get, [VHost]),
 | 
			
		||||
        ct:pal("Vhost metadata: ~p", [V]),
 | 
			
		||||
        ?assertEqual(<<"quorum">>, maps:get(default_queue_type, vhost:get_metadata(V))),
 | 
			
		||||
 | 
			
		||||
        Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
 | 
			
		||||
        {ok, Chan} = amqp_connection:open_channel(Conn),
 | 
			
		||||
        amqp_channel:call(Chan, #'queue.declare'{queue = QName, durable = true}),
 | 
			
		||||
        QNameRes = rabbit_misc:r(VHost, queue, QName),
 | 
			
		||||
        {ok, Q} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [QNameRes]),
 | 
			
		||||
        ?assertMatch(rabbit_quorum_queue, amqqueue:get_type(Q)),
 | 
			
		||||
        close_connections([Conn])
 | 
			
		||||
    after
 | 
			
		||||
        rabbit_ct_broker_helpers:rpc(
 | 
			
		||||
          Config, 0,
 | 
			
		||||
          application, unset_env, [rabbit, default_limits])
 | 
			
		||||
          application, unset_env, [rabbit, default_queue_type])
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
vhost_is_created_with_operator_policies(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -132,10 +132,7 @@ vhost_definitions(ReqData, VHostName, Context) ->
 | 
			
		|||
    ProductName = rabbit:product_name(),
 | 
			
		||||
    ProductVersion = rabbit:product_version(),
 | 
			
		||||
 | 
			
		||||
    DQT = rabbit_queue_type:short_alias_of(rabbit_vhost:default_queue_type(VHostName)),
 | 
			
		||||
    %% note: the type changes to a map
 | 
			
		||||
    VHost1 = rabbit_queue_type:inject_dqt(VHost),
 | 
			
		||||
    Metadata = maps:get(metadata, VHost1),
 | 
			
		||||
    Metadata = vhost:get_metadata(VHost),
 | 
			
		||||
 | 
			
		||||
    TopLevelDefsAndMetadata = [
 | 
			
		||||
        {rabbit_version, rabbit_data_coercion:to_binary(Vsn)},
 | 
			
		||||
| 
						 | 
				
			
			@ -147,7 +144,6 @@ vhost_definitions(ReqData, VHostName, Context) ->
 | 
			
		|||
        {explanation, rabbit_data_coercion:to_binary(io_lib:format("Definitions of virtual host '~ts'", [VHostName]))},
 | 
			
		||||
        {metadata, Metadata},
 | 
			
		||||
        {description, vhost:get_description(VHost)},
 | 
			
		||||
        {default_queue_type, DQT},
 | 
			
		||||
        {limits, vhost:get_limits(VHost)}
 | 
			
		||||
    ],
 | 
			
		||||
    Result = TopLevelDefsAndMetadata ++ retain_whitelisted(Contents),
 | 
			
		||||
| 
						 | 
				
			
			@ -288,7 +284,7 @@ export_name(_Name)                -> true.
 | 
			
		|||
 | 
			
		||||
rw_state() ->
 | 
			
		||||
    [{users,              [name, password_hash, hashing_algorithm, tags, limits]},
 | 
			
		||||
     {vhosts,             [name, description, tags, default_queue_type, metadata]},
 | 
			
		||||
     {vhosts,             [name, description, tags, metadata]},
 | 
			
		||||
     {permissions,        [user, vhost, configure, write, read]},
 | 
			
		||||
     {topic_permissions,  [user, vhost, exchange, write, read]},
 | 
			
		||||
     {parameters,         [vhost, component, name, value]},
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2126,7 +2126,6 @@ definitions_vhost_metadata_test(Config) ->
 | 
			
		|||
    ?assertEqual(#{
 | 
			
		||||
        name => VHostName,
 | 
			
		||||
        description => Desc,
 | 
			
		||||
        default_queue_type => DQT,
 | 
			
		||||
        tags => Tags,
 | 
			
		||||
        metadata => Metadata
 | 
			
		||||
    }, VH),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue