Merge pull request #4062 from rabbitmq/mk-definition-import-checksumming
Make it possible to skip definition import if imported contents have not changed
This commit is contained in:
commit
bcb6b26d3a
|
@ -148,12 +148,18 @@ fun(Conf) ->
|
|||
http -> rabbit_definitions_import_https;
|
||||
%% accept both rabbitmq_ and rabbit_ (typical core module prefix)
|
||||
rabbitmq_definitions_import_local_filesystem -> rabbit_definitions_import_local_filesystem;
|
||||
rabbitmq_definitions_import_local_filesystem -> rabbit_definitions_import_https;
|
||||
rabbitmq_definitions_import_http -> rabbit_definitions_import_https;
|
||||
%% any other value is used as is
|
||||
Module -> Module
|
||||
end
|
||||
end}.
|
||||
|
||||
{mapping, "definitions.skip_if_unchanged", "rabbit.definitions.skip_if_unchanged", [
|
||||
{datatype, {enum, [true, false]}}]}.
|
||||
|
||||
{mapping, "definitions.hashing.algorithm", "rabbit.definitions.hashing_algorithm", [
|
||||
{datatype, {enum, [sha, sha224, sha256, sha384, sha512]}}]}.
|
||||
|
||||
%% Load definitions from a remote URL over HTTPS. See
|
||||
%% https://www.rabbitmq.com/management.html#load-definitions
|
||||
{mapping, "definitions.https.url", "rabbit.definitions.url",
|
||||
|
|
|
@ -2,9 +2,32 @@
|
|||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
%%
|
||||
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
|
||||
%% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
|
||||
%%
|
||||
|
||||
|
||||
%% This module is responsible for definition import. Definition import makes
|
||||
%% it possible to seed a cluster with virtual hosts, users, permissions, policies,
|
||||
%% a messaging topology, and so on.
|
||||
%%
|
||||
%% These resources can be loaded from a local filesystem (a JSON file or a conf.d-style
|
||||
%% directory of files), an HTTPS source or any other source using a user-provided module.
|
||||
%%
|
||||
%% Definition import can be performed on node boot or at any time via CLI tools
|
||||
%% or the HTTP API. On node boot, every node performs definition import independently.
|
||||
%% However, some resource types (queues and bindings) are imported only when a certain
|
||||
%% number of nodes join the cluster. This is so that queues and their dependent
|
||||
%% objects (bindings) have enough nodes to place their replicas on.
|
||||
%%
|
||||
%% It is possible for the user to opt into skipping definition import if
|
||||
%% file/source content has not changed.
|
||||
%%
|
||||
%% See also
|
||||
%%
|
||||
%% * rabbit.schema (core Cuttlefish schema mapping file)
|
||||
%% * rabbit_definitions_import_local_filesystem
|
||||
%% * rabbit_definitions_import_http
|
||||
%% * rabbit_definitions_hashing
|
||||
-module(rabbit_definitions).
|
||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||
|
||||
|
@ -19,7 +42,9 @@
|
|||
]).
|
||||
%% import
|
||||
-export([import_raw/1, import_raw/2, import_parsed/1, import_parsed/2,
|
||||
apply_defs/2, apply_defs/3, apply_defs/4, apply_defs/5]).
|
||||
import_parsed_with_hashing/1, import_parsed_with_hashing/2,
|
||||
apply_defs/2, apply_defs/3, apply_defs/4, apply_defs/5,
|
||||
should_skip_if_unchanged/0]).
|
||||
|
||||
-export([all_definitions/0]).
|
||||
-export([
|
||||
|
@ -101,6 +126,63 @@ import_parsed(Body0, VHost) ->
|
|||
Body = atomise_map_keys(Body0),
|
||||
apply_defs(Body, ?INTERNAL_USER, fun() -> ok end, VHost).
|
||||
|
||||
|
||||
-spec import_parsed_with_hashing(Defs :: #{any() => any()} | list()) -> ok | {error, term()}.
|
||||
import_parsed_with_hashing(Body0) when is_list(Body0) ->
|
||||
import_parsed(maps:from_list(Body0));
|
||||
import_parsed_with_hashing(Body0) when is_map(Body0) ->
|
||||
rabbit_log:info("Asked to import definitions. Acting user: ~s", [?INTERNAL_USER]),
|
||||
case should_skip_if_unchanged() of
|
||||
false ->
|
||||
import_parsed(Body0);
|
||||
true ->
|
||||
Body = atomise_map_keys(Body0),
|
||||
PreviousHash = rabbit_definitions_hashing:stored_global_hash(),
|
||||
Algo = rabbit_definitions_hashing:hashing_algorithm(),
|
||||
case rabbit_definitions_hashing:hash(Algo, Body) of
|
||||
PreviousHash ->
|
||||
rabbit_log:info("Submitted definition content hash matches the stored one: ~s", [binary:part(rabbit_misc:hexify(PreviousHash), 0, 12)]),
|
||||
ok;
|
||||
Other ->
|
||||
rabbit_log:debug("Submitted definition content hash: ~s, stored one: ~s", [
|
||||
binary:part(rabbit_misc:hexify(PreviousHash), 0, 10),
|
||||
binary:part(rabbit_misc:hexify(Other), 0, 10)
|
||||
]),
|
||||
Result = apply_defs(Body, ?INTERNAL_USER),
|
||||
rabbit_definitions_hashing:store_global_hash(Other),
|
||||
Result
|
||||
end
|
||||
end.
|
||||
|
||||
-spec import_parsed_with_hashing(Defs :: #{any() => any() | list()}, VHost :: vhost:name()) -> ok | {error, term()}.
|
||||
import_parsed_with_hashing(Body0, VHost) when is_list(Body0) ->
|
||||
import_parsed(maps:from_list(Body0), VHost);
|
||||
import_parsed_with_hashing(Body0, VHost) ->
|
||||
rabbit_log:info("Asked to import definitions for virtual host '~s'. Acting user: ~s", [?INTERNAL_USER, VHost]),
|
||||
|
||||
case should_skip_if_unchanged() of
|
||||
false ->
|
||||
import_parsed(Body0, VHost);
|
||||
true ->
|
||||
Body = atomise_map_keys(Body0),
|
||||
PreviousHash = rabbit_definitions_hashing:stored_vhost_specific_hash(VHost),
|
||||
Algo = rabbit_definitions_hashing:hashing_algorithm(),
|
||||
case rabbit_definitions_hashing:hash(Algo, Body) of
|
||||
PreviousHash ->
|
||||
rabbit_log:info("Submitted definition content hash matches the stored one: ~s", [binary:part(rabbit_misc:hexify(PreviousHash), 0, 12)]),
|
||||
ok;
|
||||
Other ->
|
||||
rabbit_log:debug("Submitted definition content hash: ~s, stored one: ~s", [
|
||||
binary:part(rabbit_misc:hexify(PreviousHash), 0, 10),
|
||||
binary:part(rabbit_misc:hexify(Other), 0, 10)
|
||||
]),
|
||||
Result = apply_defs(Body, ?INTERNAL_USER, fun() -> ok end, VHost),
|
||||
rabbit_definitions_hashing:store_vhost_specific_hash(VHost, Other, ?INTERNAL_USER),
|
||||
Result
|
||||
end
|
||||
end.
|
||||
|
||||
|
||||
-spec all_definitions() -> map().
|
||||
all_definitions() ->
|
||||
Xs = list_exchanges(),
|
||||
|
@ -169,7 +251,25 @@ maybe_load_definitions_from_local_filesystem(App, Key) ->
|
|||
{ok, none} -> ok;
|
||||
{ok, Path} ->
|
||||
IsDir = filelib:is_dir(Path),
|
||||
rabbit_definitions_import_local_filesystem:load(IsDir, Path)
|
||||
Mod = rabbit_definitions_import_local_filesystem,
|
||||
|
||||
case should_skip_if_unchanged() of
|
||||
false ->
|
||||
rabbit_log:debug("Will use module ~s to import definitions", [Mod]),
|
||||
Mod:load(IsDir, Path);
|
||||
true ->
|
||||
Algo = rabbit_definitions_hashing:hashing_algorithm(),
|
||||
rabbit_log:debug("Will use module ~s to import definitions (if definition file/directory has changed, hashing algo: ~s)", [Mod, Algo]),
|
||||
CurrentHash = rabbit_definitions_hashing:stored_global_hash(),
|
||||
rabbit_log:debug("Previously stored hash value of imported definitions: ~s...", [binary:part(rabbit_misc:hexify(CurrentHash), 0, 12)]),
|
||||
case Mod:load_with_hashing(IsDir, Path, CurrentHash, Algo) of
|
||||
CurrentHash ->
|
||||
rabbit_log:info("Hash value of imported definitions matches current contents");
|
||||
UpdatedHash ->
|
||||
rabbit_log:debug("Hash value of imported definitions has changed to ~s", [binary:part(rabbit_misc:hexify(UpdatedHash), 0, 12)]),
|
||||
rabbit_definitions_hashing:store_global_hash(UpdatedHash)
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
maybe_load_definitions_from_pluggable_source(App, Key) ->
|
||||
|
@ -183,8 +283,23 @@ maybe_load_definitions_from_pluggable_source(App, Key) ->
|
|||
{error, "definition import source is configured but definitions.import_backend is not set"};
|
||||
ModOrAlias ->
|
||||
Mod = normalize_backend_module(ModOrAlias),
|
||||
rabbit_log:debug("Will use module ~s to import definitions", [Mod]),
|
||||
Mod:load(Proplist)
|
||||
case should_skip_if_unchanged() of
|
||||
false ->
|
||||
rabbit_log:debug("Will use module ~s to import definitions", [Mod]),
|
||||
Mod:load(Proplist);
|
||||
true ->
|
||||
rabbit_log:debug("Will use module ~s to import definitions (if definition file/directory/source has changed)", [Mod]),
|
||||
CurrentHash = rabbit_definitions_hashing:stored_global_hash(),
|
||||
rabbit_log:debug("Previously stored hash value of imported definitions: ~s...", [binary:part(rabbit_misc:hexify(CurrentHash), 0, 12)]),
|
||||
Algo = rabbit_definitions_hashing:hashing_algorithm(),
|
||||
case Mod:load_with_hashing(Proplist, CurrentHash, Algo) of
|
||||
CurrentHash ->
|
||||
rabbit_log:info("Hash value of imported definitions matches current contents");
|
||||
UpdatedHash ->
|
||||
rabbit_log:debug("Hash value of imported definitions has changed to ~s...", [binary:part(rabbit_misc:hexify(CurrentHash), 0, 12)]),
|
||||
rabbit_definitions_hashing:store_global_hash(UpdatedHash)
|
||||
end
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
|
@ -232,19 +347,34 @@ atomise_map_keys(Decoded) ->
|
|||
Acc#{rabbit_data_coercion:to_atom(K, utf8) => V}
|
||||
end, Decoded, Decoded).
|
||||
|
||||
-spec should_skip_if_unchanged() -> boolean().
|
||||
should_skip_if_unchanged() ->
|
||||
OptedIn = case application:get_env(rabbit, definitions) of
|
||||
undefined -> false;
|
||||
{ok, none} -> false;
|
||||
{ok, []} -> false;
|
||||
{ok, Proplist} ->
|
||||
pget(skip_if_unchanged, Proplist, false)
|
||||
end,
|
||||
%% if we do not take this into consideration, delayed queue import will be delayed
|
||||
%% on nodes that join before the target cluster size is reached, and skipped
|
||||
%% once it is
|
||||
ReachedTargetClusterSize = rabbit_nodes:reached_target_cluster_size(),
|
||||
OptedIn andalso ReachedTargetClusterSize.
|
||||
|
||||
|
||||
-spec apply_defs(Map :: #{atom() => any()}, ActingUser :: rabbit_types:username()) -> 'ok' | {error, term()}.
|
||||
|
||||
apply_defs(Map, ActingUser) ->
|
||||
apply_defs(Map, ActingUser, fun () -> ok end).
|
||||
|
||||
-spec apply_defs(Map :: #{atom() => any()}, ActingUser :: rabbit_types:username(),
|
||||
SuccessFun :: fun(() -> 'ok')) -> 'ok' | {error, term()};
|
||||
(Map :: #{atom() => any()}, ActingUser :: rabbit_types:username(),
|
||||
VHost :: vhost:name()) -> 'ok' | {error, term()}.
|
||||
-type vhost_or_success_fun() :: vhost:name() | fun(() -> 'ok').
|
||||
-spec apply_defs(Map :: #{atom() => any()},
|
||||
ActingUser :: rabbit_types:username(),
|
||||
VHostOrSuccessFun :: vhost_or_success_fun()) -> 'ok' | {error, term()}.
|
||||
|
||||
apply_defs(Map, ActingUser, VHost) when is_binary(VHost) ->
|
||||
apply_defs(Map, ActingUser, fun () -> ok end, VHost);
|
||||
|
||||
apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) ->
|
||||
Version = maps:get(rabbitmq_version, Map, maps:get(rabbit_version, Map, undefined)),
|
||||
try
|
||||
|
@ -289,7 +419,7 @@ apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) ->
|
|||
SuccessFun :: fun(() -> 'ok'),
|
||||
VHost :: vhost:name()) -> 'ok' | {error, term()}.
|
||||
|
||||
apply_defs(Map, ActingUser, SuccessFun, VHost) when is_binary(VHost) ->
|
||||
apply_defs(Map, ActingUser, SuccessFun, VHost) when is_function(SuccessFun); is_binary(VHost) ->
|
||||
rabbit_log:info("Asked to import definitions for a virtual host. Virtual host: ~p, acting user: ~p",
|
||||
[VHost, ActingUser]),
|
||||
try
|
||||
|
|
|
@ -0,0 +1,123 @@
|
|||
%% This Source Code Form is subject to the terms of the Mozilla Public
|
||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
%%
|
||||
%% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
|
||||
%%
|
||||
|
||||
%% This module is responsible for definition content hashing. Content hashing
|
||||
%% makes it possible for the user to opt into skipping definition import if
|
||||
%% file/source content has not changed.
|
||||
%%
|
||||
%% See also
|
||||
%%
|
||||
%% * rabbit.schema (core Cuttlefish schema mapping file)
|
||||
%% * rabbit_definitions
|
||||
%% * rabbit_definitions_import_local_filesystem
|
||||
%% * rabbit_definitions_import_http
|
||||
-module(rabbit_definitions_hashing).
|
||||
|
||||
-behaviour(rabbit_runtime_parameter).
|
||||
|
||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||
|
||||
-import(rabbit_misc, [pget/2, pget/3]).
|
||||
|
||||
-export([
|
||||
register/0,
|
||||
validate/5,
|
||||
notify/5,
|
||||
notify_clear/4,
|
||||
|
||||
hashing_algorithm/0,
|
||||
hash/1,
|
||||
hash/2,
|
||||
stored_global_hash/0,
|
||||
store_global_hash/1,
|
||||
store_global_hash/2,
|
||||
store_vhost_specific_hash/3,
|
||||
stored_vhost_specific_hash/1
|
||||
]).
|
||||
|
||||
-define(DEFAULT_HASHING_ALGORITHM, sha256).
|
||||
-define(GLOBAL_RUNTIME_PARAMETER_KEY, imported_definition_hash_value).
|
||||
-define(RUNTIME_PARAMETER_COMPONENT, <<"imported_definition_hash_value">>).
|
||||
-define(PARAMETER_NAME, <<"content_hash_value">>).
|
||||
|
||||
%%
|
||||
%% API
|
||||
%%
|
||||
|
||||
-rabbit_boot_step({?MODULE,
|
||||
[{description, "imported definition hash value parameters"},
|
||||
{mfa, {?MODULE, register, []}},
|
||||
{requires, rabbit_registry},
|
||||
{enables, recovery}]}).
|
||||
|
||||
register() ->
|
||||
rabbit_registry:register(runtime_parameter, ?RUNTIME_PARAMETER_COMPONENT, ?MODULE).
|
||||
|
||||
|
||||
validate(_VHost, ?RUNTIME_PARAMETER_COMPONENT, Name, Term, _User) ->
|
||||
rabbit_parameter_validation:binary(Name, Term).
|
||||
|
||||
notify(_VHost, ?RUNTIME_PARAMETER_COMPONENT, _Name, _Term0, _ActingUser) ->
|
||||
%% this parameter is used internally by RabbitMQ core, so we don't expose
|
||||
%% state changes via internal events
|
||||
ok.
|
||||
|
||||
notify_clear(_VHost, ?RUNTIME_PARAMETER_COMPONENT, _Name, _ActingUser) ->
|
||||
%% this parameter is used internally by RabbitMQ core, so we don't expose
|
||||
%% state changes via internal events
|
||||
ok.
|
||||
|
||||
|
||||
|
||||
-spec hashing_algorithm() -> crypto:sha1() | crypto:sha2() | undefined.
|
||||
hashing_algorithm() ->
|
||||
case application:get_env(rabbit, definitions) of
|
||||
undefined -> undefined;
|
||||
{ok, none} -> undefined;
|
||||
{ok, []} -> undefined;
|
||||
{ok, Proplist} ->
|
||||
pget(hashing_algorithm, Proplist, ?DEFAULT_HASHING_ALGORITHM)
|
||||
end.
|
||||
|
||||
-spec hash(Value :: term()) -> binary().
|
||||
hash(Value) ->
|
||||
crypto:hash(hashing_algorithm(), Value).
|
||||
|
||||
-spec hash(Algo :: crypto:sha1() | crypto:sha2() | undefined, Value :: term()) -> binary().
|
||||
hash(undefined, Value) ->
|
||||
hash(?DEFAULT_HASHING_ALGORITHM, Value);
|
||||
hash(Algo, Value) ->
|
||||
crypto:hash(Algo, term_to_binary(Value)).
|
||||
|
||||
-spec stored_global_hash() -> binary() | undefined.
|
||||
stored_global_hash() ->
|
||||
case rabbit_runtime_parameters:lookup_global(?GLOBAL_RUNTIME_PARAMETER_KEY) of
|
||||
not_found -> undefined;
|
||||
Proplist -> pget(value, Proplist)
|
||||
end.
|
||||
|
||||
-spec stored_vhost_specific_hash(vhost:name()) -> binary() | undefined.
|
||||
stored_vhost_specific_hash(VHostName) ->
|
||||
case rabbit_runtime_parameters:lookup(VHostName, ?RUNTIME_PARAMETER_COMPONENT, ?PARAMETER_NAME) of
|
||||
not_found -> undefined;
|
||||
Proplist -> pget(value, Proplist)
|
||||
end.
|
||||
|
||||
-spec store_global_hash(Value :: term()) -> ok.
|
||||
store_global_hash(Value) ->
|
||||
rabbit_log:debug("Storing global imported definitions content hash, hex value: ~s", [rabbit_misc:hexify(Value)]),
|
||||
store_global_hash(Value, ?INTERNAL_USER).
|
||||
|
||||
-spec store_global_hash(Value0 :: term(), Username :: rabbit_types:username()) -> ok.
|
||||
store_global_hash(Value0, Username) ->
|
||||
Value = rabbit_data_coercion:to_binary(Value0),
|
||||
rabbit_runtime_parameters:set_global(?GLOBAL_RUNTIME_PARAMETER_KEY, Value, Username).
|
||||
|
||||
-spec store_vhost_specific_hash(Value0 :: term(), VirtualHost :: vhost:name(), Username :: rabbit_types:username()) -> ok.
|
||||
store_vhost_specific_hash(VirtualHost, Value0, Username) ->
|
||||
Value = rabbit_data_coercion:to_binary(Value0),
|
||||
rabbit_runtime_parameters:set(VirtualHost, ?RUNTIME_PARAMETER_COMPONENT, ?PARAMETER_NAME, Value, Username).
|
|
@ -2,15 +2,24 @@
|
|||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
%%
|
||||
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
|
||||
%% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
|
||||
%%
|
||||
|
||||
%% This module is responsible for loading definition from an HTTPS endpoint.
|
||||
%%
|
||||
%% See also
|
||||
%%
|
||||
%% * rabbit.schema (core Cuttlefish schema mapping file)
|
||||
%% * rabbit_definitions
|
||||
%% * rabbit_definitions_import_local_filesystem
|
||||
%% * rabbit_definitions_hashing
|
||||
-module(rabbit_definitions_import_https).
|
||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||
|
||||
-export([
|
||||
is_enabled/0,
|
||||
load/1
|
||||
load/1,
|
||||
load_with_hashing/3
|
||||
]).
|
||||
|
||||
|
||||
|
@ -37,39 +46,52 @@ is_enabled() ->
|
|||
end
|
||||
end.
|
||||
|
||||
-spec load(Proplist :: list() | map()) -> ok | {error, term()}.
|
||||
load(Proplist) ->
|
||||
rabbit_log:debug("Definitions proprties: ~p", [Proplist]),
|
||||
URL = pget(url, Proplist),
|
||||
TLSOptions0 = [
|
||||
%% avoids a peer verification warning emitted by default if no certificate chain and peer verification
|
||||
%% settings are provided: these are not essential in this particular case (client-side downloads that likely
|
||||
%% will happen from a local trusted source)
|
||||
{log_level, error},
|
||||
%% use TLSv1.2 by default
|
||||
{versions, ['tlsv1.2']}
|
||||
],
|
||||
TLSOptions = pget(ssl_options, Proplist, TLSOptions0),
|
||||
HTTPOptions = [
|
||||
{ssl, TLSOptions}
|
||||
],
|
||||
rabbit_log:info("Applying definitions from a remote URL"),
|
||||
rabbit_log:debug("HTTPS URL: ~s", [URL]),
|
||||
TLSOptions = tls_options_or_default(Proplist),
|
||||
HTTPOptions = http_options(TLSOptions),
|
||||
load_from_url(URL, HTTPOptions).
|
||||
|
||||
-spec load_with_hashing(Proplist :: list() | map(), PreviousHash :: binary() | 'undefined', Algo :: crypto:sha1() | crypto:sha2()) -> binary() | 'undefined'.
|
||||
load_with_hashing(Proplist, PreviousHash, Algo) ->
|
||||
URL = pget(url, Proplist),
|
||||
rabbit_log:info("Applying definitions from a remote URL"),
|
||||
rabbit_log:debug("Loading definitions with content hashing enabled, HTTPS URL: ~s, previous hash value: ~s",
|
||||
[URL, rabbit_misc:hexify(PreviousHash)]),
|
||||
|
||||
TLSOptions = tls_options_or_default(Proplist),
|
||||
HTTPOptions = http_options(TLSOptions),
|
||||
|
||||
case httpc_get(URL, HTTPOptions) of
|
||||
%% 2XX
|
||||
{ok, {{_, Code, _}, _Headers, Body}} when Code div 100 == 2 ->
|
||||
rabbit_log:debug("Requested definitions from remote URL '~s', response code: ~b", [URL, Code]),
|
||||
rabbit_log:debug("Requested definitions from remote URL '~s', body: ~p", [URL, Body]),
|
||||
case rabbit_definitions_hashing:hash(Algo, Body) of
|
||||
PreviousHash -> PreviousHash;
|
||||
Other ->
|
||||
rabbit_log:debug("New hash: ~s", [rabbit_misc:hexify(Other)]),
|
||||
import_raw(Body),
|
||||
Other
|
||||
end;
|
||||
{ok, {{_, Code, _}, _Headers, _Body}} when Code >= 400 ->
|
||||
rabbit_log:debug("Requested definitions from remote URL '~s', response code: ~b", [URL, Code]),
|
||||
{error, {could_not_read_defs, {URL, rabbit_misc:format("URL request failed with response code ~b", [Code])}}};
|
||||
{error, Reason} ->
|
||||
rabbit_log:error("Requested definitions from remote URL '~s', error: ~p", [URL, Reason]),
|
||||
{error, {could_not_read_defs, {URL, Reason}}}
|
||||
end.
|
||||
|
||||
|
||||
%%
|
||||
%% Implementation
|
||||
%%
|
||||
|
||||
load_from_url(URL, HTTPOptions0) ->
|
||||
inets:start(),
|
||||
Options = [
|
||||
{body_format, binary}
|
||||
],
|
||||
HTTPOptions = HTTPOptions0 ++ [
|
||||
{connect_timeout, 120000},
|
||||
{autoredirect, true}
|
||||
],
|
||||
rabbit_log:info("Applying definitions from remote URL"),
|
||||
case httpc:request(get, {URL, []}, lists:usort(HTTPOptions), Options) of
|
||||
case httpc_get(URL, HTTPOptions0) of
|
||||
%% 2XX
|
||||
{ok, {{_, Code, _}, _Headers, Body}} when Code div 100 == 2 ->
|
||||
rabbit_log:debug("Requested definitions from remote URL '~s', response code: ~b", [URL, Code]),
|
||||
|
@ -82,3 +104,36 @@ load_from_url(URL, HTTPOptions0) ->
|
|||
rabbit_log:error("Requested definitions from remote URL '~s', error: ~p", [URL, Reason]),
|
||||
{error, {could_not_read_defs, {URL, Reason}}}
|
||||
end.
|
||||
|
||||
httpc_get(URL, HTTPOptions0) ->
|
||||
inets:start(),
|
||||
Options = [
|
||||
{body_format, binary}
|
||||
],
|
||||
HTTPOptions = HTTPOptions0 ++ [
|
||||
{connect_timeout, 120000},
|
||||
{autoredirect, true}
|
||||
],
|
||||
httpc:request(get, {URL, []}, lists:usort(HTTPOptions), Options).
|
||||
|
||||
http_options(TLSOptions) ->
|
||||
HTTPOptions0 = [
|
||||
{ssl, TLSOptions}
|
||||
],
|
||||
|
||||
HTTPOptions0 ++ [
|
||||
{connect_timeout, 120000},
|
||||
{autoredirect, true}
|
||||
].
|
||||
|
||||
tls_options_or_default(Proplist) ->
|
||||
TLSOptions0 = [
|
||||
%% avoids a peer verification warning emitted by default if no certificate chain and peer verification
|
||||
%% settings are provided: these are not essential in this particular case (client-side downloads that likely
|
||||
%% will happen from a local trusted source)
|
||||
{log_level, error},
|
||||
%% use TLSv1.2 by default
|
||||
{versions, ['tlsv1.2']}
|
||||
],
|
||||
TLSOptions = pget(ssl_options, Proplist, TLSOptions0),
|
||||
TLSOptions.
|
||||
|
|
|
@ -2,9 +2,18 @@
|
|||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
%%
|
||||
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
|
||||
%% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
|
||||
%%
|
||||
|
||||
%% This module is responsible for loading definition from a local filesystem
|
||||
%% (a JSON file or a conf.d-style directory of files).
|
||||
%%
|
||||
%% See also
|
||||
%%
|
||||
%% * rabbit.schema (core Cuttlefish schema mapping file)
|
||||
%% * rabbit_definitions
|
||||
%% * rabbit_definitions_import_http
|
||||
%% * rabbit_definitions_hashing
|
||||
-module(rabbit_definitions_import_local_filesystem).
|
||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||
|
||||
|
@ -14,7 +23,12 @@
|
|||
load/1,
|
||||
%% classic arguments specific to this source
|
||||
load/2,
|
||||
location/0
|
||||
load_with_hashing/3,
|
||||
load_with_hashing/4,
|
||||
location/0,
|
||||
|
||||
%% tests and REPL
|
||||
compiled_definitions_from_local_path/2
|
||||
]).
|
||||
|
||||
|
||||
|
@ -31,6 +45,7 @@
|
|||
is_enabled() ->
|
||||
is_enabled_via_classic_option() or is_enabled_via_modern_option().
|
||||
|
||||
-spec load(Proplist :: list() | map()) -> ok | {error, term()}.
|
||||
load(Proplist) when is_list(Proplist) ->
|
||||
case pget(local_path, Proplist, undefined) of
|
||||
undefined -> {error, "local definition file path is not configured: local_path is not set"};
|
||||
|
@ -51,10 +66,40 @@ load(Proplist) when is_list(Proplist) ->
|
|||
Msg = rabbit_misc:format("local definition file '~s' does not exist or cannot be read by the node", [Path]),
|
||||
{error, {could_not_read_defs, Msg}}
|
||||
end
|
||||
end;
|
||||
load(Map) when is_map(Map) ->
|
||||
load(maps:to_list(Map)).
|
||||
|
||||
-spec load(IsDir :: boolean(), Path :: file:name_all()) -> ok | {error, term()}.
|
||||
load(IsDir, Path) when is_boolean(IsDir) ->
|
||||
load_from_local_path(IsDir, Path).
|
||||
|
||||
-spec load_with_hashing(Proplist :: list() | map(), PreviousHash :: binary() | 'undefined', Algo :: crypto:sha1() | crypto:sha2()) -> binary() | 'undefined'.
|
||||
load_with_hashing(Proplist, PreviousHash, Algo) ->
|
||||
case pget(local_path, Proplist, undefined) of
|
||||
undefined -> {error, "local definition file path is not configured: local_path is not set"};
|
||||
Path ->
|
||||
IsDir = filelib:is_dir(Path),
|
||||
load_with_hashing(IsDir, Path, PreviousHash, Algo)
|
||||
end.
|
||||
|
||||
load(IsDir, Path) ->
|
||||
load_from_local_path(IsDir, Path).
|
||||
-spec load_with_hashing(IsDir :: boolean(), Path :: file:name_all(), PreviousHash :: binary() | 'undefined', Algo :: crypto:sha1() | crypto:sha2()) -> binary() | 'undefined'.
|
||||
load_with_hashing(IsDir, Path, PreviousHash, Algo) when is_boolean(IsDir) ->
|
||||
rabbit_log:debug("Loading definitions with content hashing enabled, path: ~s, is directory?: ~p, previous hash value: ~s",
|
||||
[Path, IsDir, rabbit_misc:hexify(PreviousHash)]),
|
||||
case compiled_definitions_from_local_path(IsDir, Path) of
|
||||
%% the directory is empty or no files could be read
|
||||
[] ->
|
||||
rabbit_definitions_hashing:hash(Algo, undefined);
|
||||
Defs ->
|
||||
case rabbit_definitions_hashing:hash(Algo, Defs) of
|
||||
PreviousHash -> PreviousHash;
|
||||
Other ->
|
||||
rabbit_log:debug("New hash: ~s", [rabbit_misc:hexify(Other)]),
|
||||
load_from_local_path(IsDir, Path),
|
||||
Other
|
||||
end
|
||||
end.
|
||||
|
||||
location() ->
|
||||
case location_from_classic_option() of
|
||||
|
@ -62,6 +107,7 @@ location() ->
|
|||
Value -> Value
|
||||
end.
|
||||
|
||||
-spec load_from_local_path(IsDir :: boolean(), Path :: file:name_all()) -> ok | {error, term()}.
|
||||
load_from_local_path(true, Dir) ->
|
||||
rabbit_log:info("Applying definitions from directory ~s", [Dir]),
|
||||
load_from_files(file:list_dir(Dir), Dir);
|
||||
|
@ -111,6 +157,37 @@ location_from_modern_option() ->
|
|||
pget(local_path, Proplist)
|
||||
end.
|
||||
|
||||
-spec compiled_definitions_from_local_path(IsDir :: boolean(), Dir :: file:name_all()) -> [binary()] | {error, any()}.
|
||||
compiled_definitions_from_local_path(true = _IsDir, Dir) ->
|
||||
case file:list_dir(Dir) of
|
||||
{ok, Filenames0} ->
|
||||
Filenames1 = lists:sort(Filenames0),
|
||||
Filenames2 = [filename:join(Dir, F) || F <- Filenames1],
|
||||
ReadResults = [rabbit_misc:raw_read_file(F) || F <- Filenames2],
|
||||
Successes = lists:filter(
|
||||
fun ({error, _}) -> false;
|
||||
(_) -> true
|
||||
end, ReadResults),
|
||||
[Body || {ok, Body} <- Successes];
|
||||
{error, E} ->
|
||||
rabbit_log:error("Could not list files in '~s', error: ~p", [Dir, E]),
|
||||
{error, {could_not_read_defs, {Dir, E}}}
|
||||
end;
|
||||
compiled_definitions_from_local_path(false = _IsDir, Path) ->
|
||||
case read_file_contents(Path) of
|
||||
{error, _} -> [];
|
||||
Body -> [Body]
|
||||
end.
|
||||
|
||||
-spec read_file_contents(Path :: file:name_all()) -> binary() | {error, any()}.
|
||||
read_file_contents(Path) ->
|
||||
case rabbit_misc:raw_read_file(Path) of
|
||||
{ok, Body} ->
|
||||
Body;
|
||||
{error, E} ->
|
||||
rabbit_log:error("Could not read definitions from file at '~s', error: ~p", [Path, E]),
|
||||
{error, {could_not_read_defs, {Path, E}}}
|
||||
end.
|
||||
|
||||
load_from_files({ok, Filenames0}, Dir) ->
|
||||
Filenames1 = lists:sort(Filenames0),
|
||||
|
|
|
@ -418,7 +418,7 @@ global_info_keys() -> [name, value].
|
|||
|
||||
lookup_component(Component) ->
|
||||
case rabbit_registry:lookup_module(
|
||||
runtime_parameter, list_to_atom(binary_to_list(Component))) of
|
||||
runtime_parameter, rabbit_data_coercion:to_atom(Component)) of
|
||||
{error, not_found} -> {errors,
|
||||
[{"component ~s not found", [Component]}]};
|
||||
{ok, Module} -> {ok, Module}
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
%%
|
||||
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
|
||||
%% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
|
||||
%%
|
||||
|
||||
-module(definition_import_SUITE).
|
||||
|
@ -20,6 +20,7 @@ all() ->
|
|||
%% uses rabbit.definitions with import_backend set to local_filesystem
|
||||
{group, boot_time_import_using_modern_local_filesystem_source},
|
||||
{group, boot_time_import_using_public_https_source},
|
||||
{group, skip_if_unchanged},
|
||||
{group, roundtrip},
|
||||
{group, import_on_a_running_node}
|
||||
].
|
||||
|
@ -67,6 +68,12 @@ groups() ->
|
|||
{roundtrip, [], [
|
||||
export_import_round_trip_case1,
|
||||
export_import_round_trip_case2
|
||||
]},
|
||||
|
||||
{skip_if_unchanged, [], [
|
||||
%% these all must import the same definition file
|
||||
import_on_a_booting_node_using_skip_if_unchanged,
|
||||
import_case5
|
||||
]}
|
||||
].
|
||||
|
||||
|
@ -107,6 +114,22 @@ init_per_group(boot_time_import_using_modern_local_filesystem_source = Group, Co
|
|||
]}
|
||||
]}),
|
||||
rabbit_ct_helpers:run_setup_steps(Config2, rabbit_ct_broker_helpers:setup_steps());
|
||||
%% same as the classic source semantically, uses skip_if_unchanged
|
||||
init_per_group(skip_if_unchanged = Group, Config) ->
|
||||
CasePath = filename:join(?config(data_dir, Config), "case5.json"),
|
||||
Config1 = rabbit_ct_helpers:set_config(Config, [
|
||||
{rmq_nodename_suffix, Group},
|
||||
{rmq_nodes_count, 1}
|
||||
]),
|
||||
Config2 = rabbit_ct_helpers:merge_app_env(Config1,
|
||||
{rabbit, [
|
||||
{definitions, [
|
||||
{import_backend, rabbit_definitions_import_local_filesystem},
|
||||
{local_path, CasePath},
|
||||
{skip_if_unchanged, true}
|
||||
]}
|
||||
]}),
|
||||
rabbit_ct_helpers:run_setup_steps(Config2, rabbit_ct_broker_helpers:setup_steps());
|
||||
init_per_group(boot_time_import_using_public_https_source = Group, Config) ->
|
||||
Config1 = rabbit_ct_helpers:set_config(Config, [
|
||||
{rmq_nodename_suffix, Group},
|
||||
|
@ -325,6 +348,15 @@ import_on_a_booting_node_using_public_https_source(Config) ->
|
|||
{error, timeout} -> ct:fail("virtual host ~p was not imported on boot", [VHost])
|
||||
end.
|
||||
|
||||
import_on_a_booting_node_using_skip_if_unchanged(Config) ->
|
||||
%% see case5.json
|
||||
VHost = <<"vhost2">>,
|
||||
%% verify that vhost2 eventually starts
|
||||
case rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_vhost, await_running_on_all_nodes, [VHost, 3000]) of
|
||||
ok -> ok;
|
||||
{error, timeout} -> ct:fail("virtual host ~p was not imported on boot", [VHost])
|
||||
end.
|
||||
|
||||
%%
|
||||
%% Implementation
|
||||
%%
|
||||
|
|
|
@ -46,6 +46,7 @@
|
|||
-export([atom_to_binary/1, parse_bool/1, parse_int/1]).
|
||||
-export([pid_to_string/1, string_to_pid/1,
|
||||
pid_change_node/2, node_to_fake_pid/1]).
|
||||
-export([hexify/1]).
|
||||
-export([version_compare/2, version_compare/3]).
|
||||
-export([version_minor_equivalent/2, strict_version_minor_equivalent/2]).
|
||||
-export([dict_cons/3, orddict_cons/3, maps_cons/3, gb_trees_cons/3]).
|
||||
|
@ -776,6 +777,14 @@ pid_to_string(Pid) when is_pid(Pid) ->
|
|||
{Node, Cre, Id, Ser} = decompose_pid(Pid),
|
||||
format("<~s.~B.~B.~B>", [Node, Cre, Id, Ser]).
|
||||
|
||||
-spec hexify(binary() | atom() | list()) -> binary().
|
||||
hexify(Bin) when is_binary(Bin) ->
|
||||
iolist_to_binary([io_lib:format("~2.16.0B", [V]) || <<V:8>> <= Bin]);
|
||||
hexify(Bin) when is_list(Bin) ->
|
||||
hexify(erlang:binary_to_list(Bin));
|
||||
hexify(Bin) when is_atom(Bin) ->
|
||||
hexify(erlang:atom_to_binary(Bin)).
|
||||
|
||||
%% inverse of above
|
||||
string_to_pid(Str) ->
|
||||
Err = {error, {invalid_pid_syntax, Str}},
|
||||
|
|
|
@ -13,10 +13,10 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ImportDefinitionsCommand do
|
|||
{args, Map.merge(%{format: "json", silent: true}, Helpers.case_insensitive_format(opts))}
|
||||
end
|
||||
def merge_defaults(args, opts) do
|
||||
{args, Map.merge(%{format: "json"}, Helpers.case_insensitive_format(opts))}
|
||||
{args, Map.merge(%{format: "json", skip_if_unchanged: false}, Helpers.case_insensitive_format(opts))}
|
||||
end
|
||||
|
||||
def switches(), do: [timeout: :integer, format: :string]
|
||||
def switches(), do: [timeout: :integer, format: :string, skip_if_unchanged: :boolean]
|
||||
def aliases(), do: [t: :timeout]
|
||||
|
||||
def validate(_, %{format: format})
|
||||
|
@ -44,11 +44,16 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ImportDefinitionsCommand do
|
|||
{:error, error} ->
|
||||
{:error, ExitCodes.exit_dataerr(), "Failed to deserialise input (format: #{human_friendly_format(format)}) (error: #{inspect(error)})"}
|
||||
{:ok, map} ->
|
||||
:rabbit_misc.rpc_call(node_name, :rabbit_definitions, :import_parsed, [map], timeout)
|
||||
skip? = Map.get(map, :skip_if_unchanged, false)
|
||||
fun = case skip? do
|
||||
true -> :import_parsed_with_hashing
|
||||
false -> :import_parsed
|
||||
end
|
||||
:rabbit_misc.rpc_call(node_name, :rabbit_definitions, fun, [map], timeout)
|
||||
end
|
||||
end
|
||||
end
|
||||
def run([path], %{node: node_name, timeout: timeout, format: format}) do
|
||||
def run([path], %{node: node_name, format: format, timeout: timeout}) do
|
||||
abs_path = Path.absname(path)
|
||||
|
||||
case File.read(abs_path) do
|
||||
|
@ -59,7 +64,12 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ImportDefinitionsCommand do
|
|||
{:error, error} ->
|
||||
{:error, ExitCodes.exit_dataerr(), "Failed to deserialise input (format: #{human_friendly_format(format)}) (error: #{inspect(error)})"}
|
||||
{:ok, map} ->
|
||||
:rabbit_misc.rpc_call(node_name, :rabbit_definitions, :import_parsed, [map], timeout)
|
||||
skip? = Map.get(map, :skip_if_unchanged, false)
|
||||
fun = case skip? do
|
||||
true -> :import_parsed_with_hashing
|
||||
false -> :import_parsed
|
||||
end
|
||||
:rabbit_misc.rpc_call(node_name, :rabbit_definitions, fun, [map], timeout)
|
||||
end
|
||||
{:error, :enoent} ->
|
||||
{:error, ExitCodes.exit_dataerr(), "Parent directory or file #{path} does not exist"}
|
||||
|
@ -88,12 +98,13 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ImportDefinitionsCommand do
|
|||
|
||||
def printer(), do: RabbitMQ.CLI.Printers.StdIORaw
|
||||
|
||||
def usage, do: "import_definitions <file_path | \"-\"> [--format <json | erlang>]"
|
||||
def usage, do: "import_definitions <file_path | \"-\"> [--format <json | erlang>] [--skip-if-unchanged]"
|
||||
|
||||
def usage_additional() do
|
||||
[
|
||||
["[file]", "Local file path to import from. If omitted will be read from standard input."],
|
||||
["--format", "input format to use: json or erlang"]
|
||||
["[file]", "Local file path to import from. If omitted will be read from standard input"],
|
||||
["--format", "input format to use: json or erlang"],
|
||||
["--skip-if-unchanged", "Avoids repetitive definition imports when file contents are unchanged. Target node must be configured accordingly"]
|
||||
]
|
||||
end
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ defmodule ImportDefinitionsCommandTest do
|
|||
|
||||
test "merge_defaults: defaults to JSON for format" do
|
||||
assert @command.merge_defaults([valid_file_path()], %{}) ==
|
||||
{[valid_file_path()], %{format: "json"}}
|
||||
{[valid_file_path()], %{format: "json", skip_if_unchanged: false}}
|
||||
end
|
||||
|
||||
test "merge_defaults: defaults to --silent if target is stdout" do
|
||||
|
@ -36,14 +36,14 @@ defmodule ImportDefinitionsCommandTest do
|
|||
|
||||
test "merge_defaults: format is case insensitive" do
|
||||
assert @command.merge_defaults([valid_file_path()], %{format: "JSON"}) ==
|
||||
{[valid_file_path()], %{format: "json"}}
|
||||
{[valid_file_path()], %{format: "json", skip_if_unchanged: false}}
|
||||
assert @command.merge_defaults([valid_file_path()], %{format: "Erlang"}) ==
|
||||
{[valid_file_path()], %{format: "erlang"}}
|
||||
{[valid_file_path()], %{format: "erlang", skip_if_unchanged: false}}
|
||||
end
|
||||
|
||||
test "merge_defaults: format can be overridden" do
|
||||
assert @command.merge_defaults([valid_file_path()], %{format: "erlang"}) ==
|
||||
{[valid_file_path()], %{format: "erlang"}}
|
||||
{[valid_file_path()], %{format: "erlang", skip_if_unchanged: false}}
|
||||
end
|
||||
|
||||
test "validate: accepts a file path argument", context do
|
||||
|
@ -82,6 +82,14 @@ defmodule ImportDefinitionsCommandTest do
|
|||
clear_parameter("/", "federation-upstream", "up-1")
|
||||
end
|
||||
|
||||
@tag format: "json"
|
||||
test "run: imports definitions from a file when --skip-if-unchanged is provided", context do
|
||||
assert :ok == @command.run([valid_file_path()], Map.merge(context[:opts], %{skip_if_unchanged: true}))
|
||||
|
||||
# clean up the state we've modified
|
||||
clear_parameter("/", "federation-upstream", "up-1")
|
||||
end
|
||||
|
||||
defp valid_file_path() do
|
||||
Path.join([File.cwd!(), "test", "fixtures", "files", "definitions.json"])
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue