Merge pull request #13837 from rabbitmq/dqt-export-fix
Modify default queue type injection logic
This commit is contained in:
commit
53f511fa15
|
|
@ -1081,12 +1081,10 @@ list_vhosts() ->
|
||||||
|
|
||||||
vhost_definition(VHost) ->
|
vhost_definition(VHost) ->
|
||||||
Name = vhost:get_name(VHost),
|
Name = vhost:get_name(VHost),
|
||||||
DQT = rabbit_queue_type:short_alias_of(rabbit_vhost:default_queue_type(Name)),
|
|
||||||
#{
|
#{
|
||||||
<<"name">> => Name,
|
<<"name">> => Name,
|
||||||
<<"limits">> => vhost:get_limits(VHost),
|
<<"limits">> => vhost:get_limits(VHost),
|
||||||
<<"metadata">> => vhost:get_metadata(VHost),
|
<<"metadata">> => vhost:get_metadata(VHost)
|
||||||
<<"default_queue_type">> => DQT
|
|
||||||
}.
|
}.
|
||||||
|
|
||||||
list_users() ->
|
list_users() ->
|
||||||
|
|
|
||||||
|
|
@ -57,6 +57,38 @@ recover(VHost) ->
|
||||||
ok = rabbit_file:ensure_dir(VHostStubFile),
|
ok = rabbit_file:ensure_dir(VHostStubFile),
|
||||||
ok = file:write_file(VHostStubFile, VHost),
|
ok = file:write_file(VHostStubFile, VHost),
|
||||||
ok = ensure_config_file(VHost),
|
ok = ensure_config_file(VHost),
|
||||||
|
|
||||||
|
%% in the past, a vhost didn't necessarily have a default queue type
|
||||||
|
%% and queues declared in that vhost defaulted to the type configured
|
||||||
|
%% on the node level (in the config file). Now each vhost has its default
|
||||||
|
%% queue type in the metadata. For vhosts updated from older versions,
|
||||||
|
%% we need to add the default type to the metadata
|
||||||
|
case rabbit_db_vhost:get(VHost) of
|
||||||
|
undefined ->
|
||||||
|
rabbit_log:warning("Cannot check metadata for vhost '~ts' during recovery, record not found.",
|
||||||
|
[VHost]);
|
||||||
|
VHostRecord ->
|
||||||
|
Metadata = vhost:get_metadata(VHostRecord),
|
||||||
|
case maps:is_key(default_queue_type, Metadata) of
|
||||||
|
true ->
|
||||||
|
rabbit_log:debug("Default queue type for vhost '~ts' is ~p.",
|
||||||
|
[VHost, maps:get(default_queue_type, Metadata)]),
|
||||||
|
ok;
|
||||||
|
false ->
|
||||||
|
DefaultType = rabbit_queue_type:default_alias(),
|
||||||
|
rabbit_log:info("Setting missing default queue type to '~p' for vhost '~ts'.",
|
||||||
|
[DefaultType, VHost]),
|
||||||
|
case rabbit_db_vhost:merge_metadata(VHost, #{default_queue_type => DefaultType}) of
|
||||||
|
{ok, _UpdatedVHostRecord} ->
|
||||||
|
ok;
|
||||||
|
{error, Reason} ->
|
||||||
|
% Log the error but continue recovery
|
||||||
|
rabbit_log:warning("Failed to set the default queue type for vhost '~ts': ~p",
|
||||||
|
[VHost, Reason])
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
|
||||||
{Recovered, Failed} = rabbit_amqqueue:recover(VHost),
|
{Recovered, Failed} = rabbit_amqqueue:recover(VHost),
|
||||||
AllQs = Recovered ++ Failed,
|
AllQs = Recovered ++ Failed,
|
||||||
QNames = [amqqueue:get_name(Q) || Q <- AllQs],
|
QNames = [amqqueue:get_name(Q) || Q <- AllQs],
|
||||||
|
|
@ -157,8 +189,16 @@ add(Name, Metadata, ActingUser) ->
|
||||||
catch(do_add(Name, Metadata, ActingUser))
|
catch(do_add(Name, Metadata, ActingUser))
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_add(Name, Metadata, ActingUser) ->
|
do_add(Name, Metadata0, ActingUser) ->
|
||||||
ok = is_over_vhost_limit(Name),
|
ok = is_over_vhost_limit(Name),
|
||||||
|
|
||||||
|
Metadata = case maps:is_key(default_queue_type, Metadata0) of
|
||||||
|
true ->
|
||||||
|
Metadata0;
|
||||||
|
false ->
|
||||||
|
Metadata0#{default_queue_type => rabbit_queue_type:default_alias()}
|
||||||
|
end,
|
||||||
|
|
||||||
Description = maps:get(description, Metadata, undefined),
|
Description = maps:get(description, Metadata, undefined),
|
||||||
Tags = maps:get(tags, Metadata, []),
|
Tags = maps:get(tags, Metadata, []),
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -215,6 +215,7 @@ disable_protection_from_deletion(VHost) ->
|
||||||
-spec new_metadata(binary(), [atom()], rabbit_queue_type:queue_type() | 'undefined') -> metadata().
|
-spec new_metadata(binary(), [atom()], rabbit_queue_type:queue_type() | 'undefined') -> metadata().
|
||||||
new_metadata(Description, Tags, undefined) ->
|
new_metadata(Description, Tags, undefined) ->
|
||||||
#{description => Description,
|
#{description => Description,
|
||||||
|
default_queue_type => rabbit_queue_type:default_alias(),
|
||||||
tags => Tags};
|
tags => Tags};
|
||||||
new_metadata(Description, Tags, DefaultQueueType) ->
|
new_metadata(Description, Tags, DefaultQueueType) ->
|
||||||
#{description => Description,
|
#{description => Description,
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,7 @@ all() ->
|
||||||
groups() ->
|
groups() ->
|
||||||
ClusterSize1Tests = [
|
ClusterSize1Tests = [
|
||||||
vhost_is_created_with_default_limits,
|
vhost_is_created_with_default_limits,
|
||||||
|
vhost_is_created_with_default_queue_type,
|
||||||
vhost_is_created_with_operator_policies,
|
vhost_is_created_with_operator_policies,
|
||||||
vhost_is_created_with_default_user,
|
vhost_is_created_with_default_user,
|
||||||
single_node_vhost_deletion_forces_connection_closure,
|
single_node_vhost_deletion_forces_connection_closure,
|
||||||
|
|
@ -307,13 +308,14 @@ vhost_update_default_queue_type_undefined(Config) ->
|
||||||
VHost = <<"update-default_queue_type-with-undefined-test">>,
|
VHost = <<"update-default_queue_type-with-undefined-test">>,
|
||||||
Description = <<"rmqfpas-105 test vhost">>,
|
Description = <<"rmqfpas-105 test vhost">>,
|
||||||
Tags = [replicate, private],
|
Tags = [replicate, private],
|
||||||
DefaultQueueType = quorum,
|
VhostDefaultQueueType = quorum,
|
||||||
|
NodeDefaultQueueType = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_queue_type, default_alias, []),
|
||||||
Trace = false,
|
Trace = false,
|
||||||
ActingUser = <<"acting-user">>,
|
ActingUser = <<"acting-user">>,
|
||||||
try
|
try
|
||||||
?assertMatch(ok, rabbit_ct_broker_helpers:add_vhost(Config, VHost)),
|
?assertMatch(ok, rabbit_ct_broker_helpers:add_vhost(Config, VHost)),
|
||||||
|
|
||||||
PutVhostArgs0 = [VHost, Description, Tags, DefaultQueueType, Trace, ActingUser],
|
PutVhostArgs0 = [VHost, Description, Tags, VhostDefaultQueueType, Trace, ActingUser],
|
||||||
?assertMatch(ok,
|
?assertMatch(ok,
|
||||||
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_vhost, put_vhost, PutVhostArgs0)),
|
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_vhost, put_vhost, PutVhostArgs0)),
|
||||||
|
|
||||||
|
|
@ -322,7 +324,7 @@ vhost_update_default_queue_type_undefined(Config) ->
|
||||||
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_vhost, put_vhost, PutVhostArgs1)),
|
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_vhost, put_vhost, PutVhostArgs1)),
|
||||||
|
|
||||||
V = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_vhost, lookup, [VHost]),
|
V = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_vhost, lookup, [VHost]),
|
||||||
?assertMatch(#{default_queue_type := DefaultQueueType}, vhost:get_metadata(V))
|
?assertMatch(#{default_queue_type := NodeDefaultQueueType}, vhost:get_metadata(V))
|
||||||
after
|
after
|
||||||
rabbit_ct_broker_helpers:delete_vhost(Config, VHost)
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost)
|
||||||
end.
|
end.
|
||||||
|
|
@ -466,6 +468,33 @@ vhost_is_created_with_default_limits(Config) ->
|
||||||
application, unset_env, [rabbit, default_limits])
|
application, unset_env, [rabbit, default_limits])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
vhost_is_created_with_default_queue_type(Config) ->
|
||||||
|
VHost = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
QName = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
?assertEqual(ok, rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||||
|
application, set_env, [rabbit, default_queue_type, rabbit_quorum_queue])),
|
||||||
|
try
|
||||||
|
?assertEqual(ok, rabbit_ct_broker_helpers:add_vhost(Config, VHost)),
|
||||||
|
rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost),
|
||||||
|
?assertEqual(<<"quorum">>, rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||||
|
rabbit_vhost, default_queue_type, [VHost])),
|
||||||
|
V = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_db_vhost, get, [VHost]),
|
||||||
|
ct:pal("Vhost metadata: ~p", [V]),
|
||||||
|
?assertEqual(<<"quorum">>, maps:get(default_queue_type, vhost:get_metadata(V))),
|
||||||
|
|
||||||
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
|
||||||
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
||||||
|
amqp_channel:call(Chan, #'queue.declare'{queue = QName, durable = true}),
|
||||||
|
QNameRes = rabbit_misc:r(VHost, queue, QName),
|
||||||
|
{ok, Q} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [QNameRes]),
|
||||||
|
?assertMatch(rabbit_quorum_queue, amqqueue:get_type(Q)),
|
||||||
|
close_connections([Conn])
|
||||||
|
after
|
||||||
|
rabbit_ct_broker_helpers:rpc(
|
||||||
|
Config, 0,
|
||||||
|
application, unset_env, [rabbit, default_queue_type])
|
||||||
|
end.
|
||||||
|
|
||||||
vhost_is_created_with_operator_policies(Config) ->
|
vhost_is_created_with_operator_policies(Config) ->
|
||||||
VHost = <<"vhost1">>,
|
VHost = <<"vhost1">>,
|
||||||
PolicyName = <<"default-operator-policy">>,
|
PolicyName = <<"default-operator-policy">>,
|
||||||
|
|
|
||||||
|
|
@ -132,10 +132,7 @@ vhost_definitions(ReqData, VHostName, Context) ->
|
||||||
ProductName = rabbit:product_name(),
|
ProductName = rabbit:product_name(),
|
||||||
ProductVersion = rabbit:product_version(),
|
ProductVersion = rabbit:product_version(),
|
||||||
|
|
||||||
DQT = rabbit_queue_type:short_alias_of(rabbit_vhost:default_queue_type(VHostName)),
|
Metadata = vhost:get_metadata(VHost),
|
||||||
%% note: the type changes to a map
|
|
||||||
VHost1 = rabbit_queue_type:inject_dqt(VHost),
|
|
||||||
Metadata = maps:get(metadata, VHost1),
|
|
||||||
|
|
||||||
TopLevelDefsAndMetadata = [
|
TopLevelDefsAndMetadata = [
|
||||||
{rabbit_version, rabbit_data_coercion:to_binary(Vsn)},
|
{rabbit_version, rabbit_data_coercion:to_binary(Vsn)},
|
||||||
|
|
@ -147,7 +144,6 @@ vhost_definitions(ReqData, VHostName, Context) ->
|
||||||
{explanation, rabbit_data_coercion:to_binary(io_lib:format("Definitions of virtual host '~ts'", [VHostName]))},
|
{explanation, rabbit_data_coercion:to_binary(io_lib:format("Definitions of virtual host '~ts'", [VHostName]))},
|
||||||
{metadata, Metadata},
|
{metadata, Metadata},
|
||||||
{description, vhost:get_description(VHost)},
|
{description, vhost:get_description(VHost)},
|
||||||
{default_queue_type, DQT},
|
|
||||||
{limits, vhost:get_limits(VHost)}
|
{limits, vhost:get_limits(VHost)}
|
||||||
],
|
],
|
||||||
Result = TopLevelDefsAndMetadata ++ retain_whitelisted(Contents),
|
Result = TopLevelDefsAndMetadata ++ retain_whitelisted(Contents),
|
||||||
|
|
@ -288,7 +284,7 @@ export_name(_Name) -> true.
|
||||||
|
|
||||||
rw_state() ->
|
rw_state() ->
|
||||||
[{users, [name, password_hash, hashing_algorithm, tags, limits]},
|
[{users, [name, password_hash, hashing_algorithm, tags, limits]},
|
||||||
{vhosts, [name, description, tags, default_queue_type, metadata]},
|
{vhosts, [name, description, tags, metadata]},
|
||||||
{permissions, [user, vhost, configure, write, read]},
|
{permissions, [user, vhost, configure, write, read]},
|
||||||
{topic_permissions, [user, vhost, exchange, write, read]},
|
{topic_permissions, [user, vhost, exchange, write, read]},
|
||||||
{parameters, [vhost, component, name, value]},
|
{parameters, [vhost, component, name, value]},
|
||||||
|
|
|
||||||
|
|
@ -2126,7 +2126,6 @@ definitions_vhost_metadata_test(Config) ->
|
||||||
?assertEqual(#{
|
?assertEqual(#{
|
||||||
name => VHostName,
|
name => VHostName,
|
||||||
description => Desc,
|
description => Desc,
|
||||||
default_queue_type => DQT,
|
|
||||||
tags => Tags,
|
tags => Tags,
|
||||||
metadata => Metadata
|
metadata => Metadata
|
||||||
}, VH),
|
}, VH),
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue