Merge branch 'master' into delegate_opt

This commit is contained in:
Michael Klishin 2021-12-26 01:46:29 +03:00
commit 8a0ad56182
No known key found for this signature in database
GPG Key ID: E80EDCFA0CDB21EE
42 changed files with 996 additions and 223 deletions

View File

@ -13,6 +13,7 @@ on:
- '*.bzl'
- '*.bazel'
- .github/workflows/test.yaml
pull_request:
jobs:
test:
name: Test

View File

@ -34,13 +34,13 @@ buildbuddy(
git_repository(
name = "rbe_23",
commit = "d2b454dc5138a2a92de45a0a672241a4fbb5a1e5",
commit = "b21c066e426de48e526cc0f8c5158b7024d04e85",
remote = "https://github.com/rabbitmq/rbe-erlang-platform.git",
)
git_repository(
name = "rbe_24",
commit = "a087892ef4202dc3245b64d36d5921491848315f",
commit = "c8cbf65e2facbe464ebbcee7b6cf6f7a2d422ded",
remote = "https://github.com/rabbitmq/rbe-erlang-platform.git",
)

View File

@ -703,9 +703,6 @@ suites = [
additional_hdrs = [
"src/rabbit_fifo.hrl",
],
erlc_opts = [
"-I deps/rabbit", # allow rabbit_fifo.hrl to be included at src/rabbit_fifo.hrl
],
runtime_deps = [
"@meck//:bazel_erlang_lib",
"@ra//:bazel_erlang_lib",

View File

@ -500,7 +500,7 @@ stop_and_halt() ->
%% init:stop() will be called regardless of any errors.
try
AppsLeft = [ A || {A, _, _} <- application:which_applications() ],
?LOG_ERROR(
?LOG_INFO(
lists:flatten(
["Halting Erlang VM with the following applications:~n",
[" ~p~n" || _ <- AppsLeft]]),

View File

@ -38,35 +38,42 @@ check_user_pass_login(Username, Password) ->
check_user_login(Username, AuthProps) ->
%% extra auth properties like MQTT client id are in AuthProps
{ok, Modules} = application:get_env(rabbit, auth_backends),
R = lists:foldl(
fun (rabbit_auth_backend_cache=ModN, {refused, _, _, _}) ->
%% It is possible to specify authn/authz within the cache module settings,
%% so we have to do both auth steps here
%% See this rabbitmq-users discussion:
%% https://groups.google.com/d/topic/rabbitmq-users/ObqM7MQdA3I/discussion
try_authenticate_and_try_authorize(ModN, ModN, Username, AuthProps);
({ModN, ModZs}, {refused, _, _, _}) ->
%% Different modules for authN vs authZ. So authenticate
%% with authN module, then if that succeeds do
%% passwordless (i.e pre-authenticated) login with authZ.
try_authenticate_and_try_authorize(ModN, ModZs, Username, AuthProps);
(Mod, {refused, _, _, _}) ->
%% Same module for authN and authZ. Just take the result
%% it gives us
case try_authenticate(Mod, Username, AuthProps) of
{ok, ModNUser = #auth_user{username = Username2, impl = Impl}} ->
rabbit_log:debug("User '~s' authenticated successfully by backend ~s", [Username2, Mod]),
user(ModNUser, {ok, [{Mod, Impl}], []});
Else ->
rabbit_log:debug("User '~s' failed authenticatation by backend ~s", [Username, Mod]),
Else
end;
(_, {ok, User}) ->
%% We've successfully authenticated. Skip to the end...
{ok, User}
end,
{refused, Username, "No modules checked '~s'", [Username]}, Modules),
R.
try
lists:foldl(
fun (rabbit_auth_backend_cache=ModN, {refused, _, _, _}) ->
%% It is possible to specify authn/authz within the cache module settings,
%% so we have to do both auth steps here
%% See this rabbitmq-users discussion:
%% https://groups.google.com/d/topic/rabbitmq-users/ObqM7MQdA3I/discussion
try_authenticate_and_try_authorize(ModN, ModN, Username, AuthProps);
({ModN, ModZs}, {refused, _, _, _}) ->
%% Different modules for authN vs authZ. So authenticate
%% with authN module, then if that succeeds do
%% passwordless (i.e pre-authenticated) login with authZ.
try_authenticate_and_try_authorize(ModN, ModZs, Username, AuthProps);
(Mod, {refused, _, _, _}) ->
%% Same module for authN and authZ. Just take the result
%% it gives us
case try_authenticate(Mod, Username, AuthProps) of
{ok, ModNUser = #auth_user{username = Username2, impl = Impl}} ->
rabbit_log:debug("User '~s' authenticated successfully by backend ~s", [Username2, Mod]),
user(ModNUser, {ok, [{Mod, Impl}], []});
Else ->
rabbit_log:debug("User '~s' failed authenticatation by backend ~s", [Username, Mod]),
Else
end;
(_, {ok, User}) ->
%% We've successfully authenticated. Skip to the end...
{ok, User}
end,
{refused, Username, "No modules checked '~s'", [Username]}, Modules)
catch
Type:Error:Stacktrace ->
rabbit_log:debug("User '~s' authentication failed with ~s:~p:~n~p", [Username, Type, Error, Stacktrace]),
{refused, Username, "User '~s' authentication failed with internal error. "
"Enable debug logs to see the real error.", [Username]}
end.
try_authenticate_and_try_authorize(ModN, ModZs0, Username, AuthProps) ->
ModZs = case ModZs0 of

View File

@ -14,12 +14,15 @@
-export([user_login_authentication/2, user_login_authorization/2,
check_vhost_access/3, check_resource_access/4, check_topic_access/4]).
-export([add_user/3, delete_user/2, lookup_user/1, exists/1,
-export([add_user/3, add_user/4, add_user/5, delete_user/2, lookup_user/1, exists/1,
change_password/3, clear_password/2,
hash_password/2, change_password_hash/2, change_password_hash/3,
set_tags/3, set_permissions/6, clear_permissions/3,
set_topic_permissions/6, clear_topic_permissions/3, clear_topic_permissions/4,
add_user_sans_validation/3, put_user/2, put_user/3]).
add_user_sans_validation/3, put_user/2, put_user/3,
update_user/5,
update_user_with_hash/5,
add_user_sans_validation/6]).
-export([set_user_limits/3, clear_user_limits/3, is_over_connection_limit/1,
is_over_channel_limit/1, get_user_limits/0, get_user_limits/1]).
@ -208,14 +211,56 @@ add_user(Username, Password, ActingUser) ->
validate_and_alternate_credentials(Username, Password, ActingUser,
fun add_user_sans_validation/3).
-spec add_user(rabbit_types:username(), rabbit_types:password(),
rabbit_types:username(), [atom()]) -> 'ok' | {'error', string()}.
add_user(Username, Password, ActingUser, Tags) ->
add_user(Username, Password, ActingUser, undefined, Tags).
add_user(Username, Password, ActingUser, Limits, Tags) ->
validate_and_alternate_credentials(Username, Password, ActingUser,
add_user_sans_validation(Limits, Tags)).
add_user_sans_validation(Username, Password, ActingUser) ->
add_user_sans_validation(Username, Password, ActingUser, undefined, []).
add_user_sans_validation(Limits, Tags) ->
fun(Username, Password, ActingUser) ->
add_user_sans_validation(Username, Password, ActingUser, Limits, Tags)
end.
add_user_sans_validation(Username, Password, ActingUser, Limits, Tags) ->
rabbit_log:debug("Asked to create a new user '~s', password length in bytes: ~p", [Username, bit_size(Password)]),
%% hash_password will pick the hashing function configured for us
%% but we also need to store a hint as part of the record, so we
%% retrieve it here one more time
HashingMod = rabbit_password:hashing_mod(),
PasswordHash = hash_password(HashingMod, Password),
User = internal_user:create_user(Username, PasswordHash, HashingMod),
User0 = internal_user:create_user(Username, PasswordHash, HashingMod),
ConvertedTags = [rabbit_data_coercion:to_atom(I) || I <- Tags],
User1 = internal_user:set_tags(User0, ConvertedTags),
User = case Limits of
undefined -> User1;
Term -> internal_user:update_limits(add, User1, Term)
end,
add_user_sans_validation_in(Username, User, ConvertedTags, Limits, ActingUser).
add_user_sans_validation(Username, PasswordHash, HashingAlgorithm, Tags, Limits, ActingUser) ->
rabbit_log:debug("Asked to create a new user '~s' with password hash", [Username]),
ConvertedTags = [rabbit_data_coercion:to_atom(I) || I <- Tags],
HashingMod = rabbit_password:hashing_mod(),
User0 = internal_user:create_user(Username, PasswordHash, HashingMod),
User1 = internal_user:set_tags(
internal_user:set_password_hash(User0,
PasswordHash, HashingAlgorithm),
ConvertedTags),
User = case Limits of
undefined -> User1;
Term -> internal_user:update_limits(add, User1, Term)
end,
add_user_sans_validation_in(Username, User, ConvertedTags, Limits, ActingUser).
add_user_sans_validation_in(Username, User, ConvertedTags, Limits, ActingUser) ->
try
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
@ -229,6 +274,14 @@ add_user_sans_validation(Username, Password, ActingUser) ->
rabbit_log:info("Created user '~s'", [Username]),
rabbit_event:notify(user_created, [{name, Username},
{user_who_performed_action, ActingUser}]),
case ConvertedTags of
[] -> ok;
_ -> notify_user_tags_set(Username, ConvertedTags, ActingUser)
end,
case Limits of
undefined -> ok;
_ -> notify_limit_set(Username, ActingUser, Limits)
end,
R
catch
throw:{error, {user_already_exists, _}} = Error ->
@ -322,6 +375,42 @@ change_password_sans_validation(Username, Password, ActingUser) ->
erlang:raise(Class, Error, Stacktrace)
end.
update_user(Username, Password, Tags, Limits, ActingUser) ->
validate_and_alternate_credentials(Username, Password, ActingUser,
update_user_sans_validation(Tags, Limits)).
update_user_sans_validation(Tags, Limits) ->
fun(Username, Password, ActingUser) ->
try
rabbit_log:debug("Asked to change password of user '~s', new password length in bytes: ~p", [Username, bit_size(Password)]),
HashingAlgorithm = rabbit_password:hashing_mod(),
rabbit_log:debug("Asked to set user tags for user '~s' to ~p", [Username, Tags]),
ConvertedTags = [rabbit_data_coercion:to_atom(I) || I <- Tags],
R = update_user_with_hash(Username,
hash_password(rabbit_password:hashing_mod(),
Password),
HashingAlgorithm,
ConvertedTags,
Limits),
rabbit_log:info("Successfully changed password for user '~s'", [Username]),
rabbit_event:notify(user_password_changed,
[{name, Username},
{user_who_performed_action, ActingUser}]),
notify_user_tags_set(Username, ConvertedTags, ActingUser),
R
catch
throw:{error, {no_such_user, _}} = Error ->
rabbit_log:warning("Failed to change password for user '~s': the user does not exist", [Username]),
throw(Error);
Class:Error:Stacktrace ->
rabbit_log:warning("Failed to change password for user '~s': ~p", [Username, Error]),
erlang:raise(Class, Error, Stacktrace)
end
end.
-spec clear_password(rabbit_types:username(), rabbit_types:username()) -> 'ok'.
clear_password(Username, ActingUser) ->
@ -346,10 +435,22 @@ change_password_hash(Username, PasswordHash) ->
change_password_hash(Username, PasswordHash, HashingAlgorithm) ->
update_user(Username, fun(User) ->
internal_user:set_password_hash(User,
PasswordHash, HashingAlgorithm)
end).
update_user_with_hash(Username, PasswordHash, HashingAlgorithm, [], undefined).
update_user_with_hash(Username, PasswordHash, HashingAlgorithm, ConvertedTags, Limits) ->
update_user(Username,
fun(User0) ->
User1 = internal_user:set_password_hash(User0,
PasswordHash, HashingAlgorithm),
User2 = case Limits of
undefined -> User1;
_ -> internal_user:update_limits(add, User1, Limits)
end,
case ConvertedTags of
[] -> User2;
_ -> internal_user:set_tags(User2, ConvertedTags)
end
end).
-spec set_tags(rabbit_types:username(), [atom()], rabbit_types:username()) -> 'ok'.
@ -360,9 +461,7 @@ set_tags(Username, Tags, ActingUser) ->
R = update_user(Username, fun(User) ->
internal_user:set_tags(User, ConvertedTags)
end),
rabbit_log:info("Successfully set user tags for user '~s' to ~p", [Username, ConvertedTags]),
rabbit_event:notify(user_tags_set, [{name, Username}, {tags, ConvertedTags},
{user_who_performed_action, ActingUser}]),
notify_user_tags_set(Username, ConvertedTags, ActingUser),
R
catch
throw:{error, {no_such_user, _}} = Error ->
@ -373,6 +472,11 @@ set_tags(Username, Tags, ActingUser) ->
erlang:raise(Class, Error, Stacktrace)
end .
notify_user_tags_set(Username, ConvertedTags, ActingUser) ->
rabbit_log:info("Successfully set user tags for user '~s' to ~p", [Username, ConvertedTags]),
rabbit_event:notify(user_tags_set, [{name, Username}, {tags, ConvertedTags},
{user_who_performed_action, ActingUser}]).
-spec set_permissions
(rabbit_types:username(), rabbit_types:vhost(), regexp(), regexp(),
regexp(), rabbit_types:username()) ->
@ -648,13 +752,27 @@ put_user(User, Version, ActingUser) ->
rabbit_credential_validation:validate(Username, Password) =:= ok
end,
Limits = case rabbit_feature_flags:is_enabled(user_limits) of
false ->
undefined;
true ->
case maps:get(limits, User, undefined) of
undefined ->
undefined;
Term ->
case validate_user_limits(Term) of
ok -> Term;
Error -> throw(Error)
end
end
end,
case exists(Username) of
true ->
case {HasPassword, HasPasswordHash} of
{true, false} ->
update_user_password(PassedCredentialValidation, Username, Password, Tags, ActingUser);
update_user_password(PassedCredentialValidation, Username, Password, Tags, Limits, ActingUser);
{false, true} ->
update_user_password_hash(Username, PasswordHash, Tags, User, Version, ActingUser);
update_user_password_hash(Username, PasswordHash, Tags, Limits, User, Version);
{true, true} ->
throw({error, both_password_and_password_hash_are_provided});
%% clear password, update tags if needed
@ -665,63 +783,54 @@ put_user(User, Version, ActingUser) ->
false ->
case {HasPassword, HasPasswordHash} of
{true, false} ->
create_user_with_password(PassedCredentialValidation, Username, Password, Tags, Permissions, ActingUser);
create_user_with_password(PassedCredentialValidation, Username, Password, Tags, Permissions, Limits, ActingUser);
{false, true} ->
create_user_with_password_hash(Username, PasswordHash, Tags, User, Version, Permissions, ActingUser);
create_user_with_password_hash(Username, PasswordHash, Tags, User, Version, Permissions, Limits, ActingUser);
{true, true} ->
throw({error, both_password_and_password_hash_are_provided});
{false, false} ->
%% this user won't be able to sign in using
%% a username/password pair but can be used for x509 certificate authentication,
%% with authn backends such as HTTP or LDAP and so on.
create_user_with_password(PassedCredentialValidation, Username, <<"">>, Tags, Permissions, ActingUser)
create_user_with_password(PassedCredentialValidation, Username, <<"">>, Tags, Permissions, Limits, ActingUser)
end
end.
update_user_password(_PassedCredentialValidation = true, Username, Password, Tags, ActingUser) ->
rabbit_auth_backend_internal:change_password(Username, Password, ActingUser),
rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser);
update_user_password(_PassedCredentialValidation = false, _Username, _Password, _Tags, _ActingUser) ->
update_user_password(_PassedCredentialValidation = true, Username, Password, Tags, Limits, ActingUser) ->
%% change_password, set_tags and limits
rabbit_auth_backend_internal:update_user(Username, Password, Tags, Limits, ActingUser);
update_user_password(_PassedCredentialValidation = false, _Username, _Password, _Tags, _Limits, _ActingUser) ->
%% we don't log here because
%% rabbit_auth_backend_internal will do it
throw({error, credential_validation_failed}).
update_user_password_hash(Username, PasswordHash, Tags, User, Version, ActingUser) ->
update_user_password_hash(Username, PasswordHash, Tags, Limits, User, Version) ->
%% when a hash this provided, credential validation
%% is not applied
HashingAlgorithm = hashing_algorithm(User, Version),
Hash = rabbit_misc:b64decode_or_throw(PasswordHash),
rabbit_auth_backend_internal:change_password_hash(
Username, Hash, HashingAlgorithm),
rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser).
ConvertedTags = [rabbit_data_coercion:to_atom(I) || I <- Tags],
rabbit_auth_backend_internal:update_user_with_hash(
Username, Hash, HashingAlgorithm, ConvertedTags, Limits).
create_user_with_password(_PassedCredentialValidation = true, Username, Password, Tags, undefined, ActingUser) ->
rabbit_auth_backend_internal:add_user(Username, Password, ActingUser),
rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser);
create_user_with_password(_PassedCredentialValidation = true, Username, Password, Tags, PreconfiguredPermissions, ActingUser) ->
rabbit_auth_backend_internal:add_user(Username, Password, ActingUser),
rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser),
create_user_with_password(_PassedCredentialValidation = true, Username, Password, Tags, undefined, Limits, ActingUser) ->
rabbit_auth_backend_internal:add_user(Username, Password, ActingUser, Limits, Tags);
create_user_with_password(_PassedCredentialValidation = true, Username, Password, Tags, PreconfiguredPermissions, Limits, ActingUser) ->
rabbit_auth_backend_internal:add_user(Username, Password, ActingUser, Limits, Tags),
preconfigure_permissions(Username, PreconfiguredPermissions, ActingUser);
create_user_with_password(_PassedCredentialValidation = false, _Username, _Password, _Tags, _, _) ->
create_user_with_password(_PassedCredentialValidation = false, _Username, _Password, _Tags, _, _, _) ->
%% we don't log here because
%% rabbit_auth_backend_internal will do it
throw({error, credential_validation_failed}).
create_user_with_password_hash(Username, PasswordHash, Tags, User, Version, PreconfiguredPermissions, ActingUser) ->
create_user_with_password_hash(Username, PasswordHash, Tags, User, Version, PreconfiguredPermissions, Limits, ActingUser) ->
%% when a hash this provided, credential validation
%% is not applied
HashingAlgorithm = hashing_algorithm(User, Version),
Hash = rabbit_misc:b64decode_or_throw(PasswordHash),
%% first we create a user with dummy credentials and no
%% validation applied, then we update password hash
TmpPassword = rabbit_guid:binary(rabbit_guid:gen_secure(), "tmp"),
rabbit_auth_backend_internal:add_user_sans_validation(Username, TmpPassword, ActingUser),
rabbit_auth_backend_internal:change_password_hash(
Username, Hash, HashingAlgorithm),
rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser),
rabbit_auth_backend_internal:add_user_sans_validation(Username, Hash, HashingAlgorithm, Tags, Limits, ActingUser),
preconfigure_permissions(Username, PreconfiguredPermissions, ActingUser).
preconfigure_permissions(_Username, undefined, _ActingUser) ->
@ -756,8 +865,7 @@ set_user_limits(Username, Definition, ActingUser) when is_map(Definition) ->
end.
validate_parameters_and_update_limit(Username, Term, ActingUser) ->
case flatten_errors(rabbit_parameter_validation:proplist(
<<"user-limits">>, user_limit_validation(), Term)) of
case validate_user_limits(Term) of
ok ->
update_user(Username, fun(User) ->
internal_user:update_limits(add, User, Term)
@ -767,6 +875,10 @@ validate_parameters_and_update_limit(Username, Term, ActingUser) ->
{error_string, rabbit_misc:format(Reason, Arguments)}
end.
validate_user_limits(Term) ->
flatten_errors(rabbit_parameter_validation:proplist(
<<"user-limits">>, user_limit_validation(), Term)).
user_limit_validation() ->
[{<<"max-connections">>, fun rabbit_parameter_validation:integer/2, optional},
{<<"max-channels">>, fun rabbit_parameter_validation:integer/2, optional}].

View File

@ -1306,7 +1306,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_expiration_header(Props),
DoConfirm = Tx =/= none orelse ConfirmEnabled,
{MsgSeqNo, State1} =
case DoConfirm orelse Mandatory of
case DoConfirm of
false -> {undefined, State0};
true -> rabbit_global_counters:messages_received_confirm(amqp091, 1),
SeqNo = State0#ch.publish_seqno,

View File

@ -464,6 +464,10 @@ add_policy(Param, Username) ->
add_policy(VHost, Param, Username) ->
Key = maps:get(name, Param, undefined),
case Key of
undefined -> exit(rabbit_misc:format("policy in virtual host '~s' has undefined name", [VHost]));
_ -> ok
end,
case rabbit_policy:set(
VHost, Key, maps:get(pattern, Param, undefined),
case maps:get(definition, Param, undefined) of

View File

@ -33,6 +33,7 @@
get_disk_free/0, set_enabled/1]).
-define(SERVER, ?MODULE).
-define(ETS_NAME, ?MODULE).
-define(DEFAULT_MIN_DISK_CHECK_INTERVAL, 100).
-define(DEFAULT_MAX_DISK_CHECK_INTERVAL, 10000).
-define(DEFAULT_DISK_FREE_LIMIT, 50000000).
@ -73,51 +74,42 @@
%%----------------------------------------------------------------------------
-spec get_disk_free_limit() -> integer().
get_disk_free_limit() ->
gen_server:call(?MODULE, get_disk_free_limit, infinity).
safe_ets_lookup(disk_free_limit, ?DEFAULT_DISK_FREE_LIMIT).
-spec set_disk_free_limit(disk_free_limit()) -> 'ok'.
set_disk_free_limit(Limit) ->
gen_server:call(?MODULE, {set_disk_free_limit, Limit}, infinity).
gen_server:call(?MODULE, {set_disk_free_limit, Limit}).
-spec get_min_check_interval() -> integer().
get_min_check_interval() ->
gen_server:call(?MODULE, get_min_check_interval, infinity).
safe_ets_lookup(min_check_interval, ?DEFAULT_MIN_DISK_CHECK_INTERVAL).
-spec set_min_check_interval(integer()) -> 'ok'.
set_min_check_interval(Interval) ->
gen_server:call(?MODULE, {set_min_check_interval, Interval}, infinity).
gen_server:call(?MODULE, {set_min_check_interval, Interval}).
-spec get_max_check_interval() -> integer().
get_max_check_interval() ->
gen_server:call(?MODULE, get_max_check_interval, infinity).
safe_ets_lookup(max_check_interval, ?DEFAULT_MAX_DISK_CHECK_INTERVAL).
-spec set_max_check_interval(integer()) -> 'ok'.
set_max_check_interval(Interval) ->
gen_server:call(?MODULE, {set_max_check_interval, Interval}, infinity).
gen_server:call(?MODULE, {set_max_check_interval, Interval}).
-spec get_disk_free() -> (integer() | 'unknown').
get_disk_free() ->
gen_server:call(?MODULE, get_disk_free, infinity).
safe_ets_lookup(disk_free, unknown).
-spec set_enabled(string()) -> 'ok'.
set_enabled(Enabled) ->
gen_server:call(?MODULE, {set_enabled, Enabled}, infinity).
gen_server:call(?MODULE, {set_enabled, Enabled}).
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
-spec start_link(disk_free_limit()) -> rabbit_types:ok_pid_or_error().
start_link(Args) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
@ -125,18 +117,16 @@ init([Limit]) ->
Dir = dir(),
{ok, Retries} = application:get_env(rabbit, disk_monitor_failure_retries),
{ok, Interval} = application:get_env(rabbit, disk_monitor_failure_retry_interval),
State = #state{dir = Dir,
min_interval = ?DEFAULT_MIN_DISK_CHECK_INTERVAL,
max_interval = ?DEFAULT_MAX_DISK_CHECK_INTERVAL,
alarmed = false,
enabled = true,
limit = Limit,
retries = Retries,
interval = Interval},
{ok, enable(State)}.
handle_call(get_disk_free_limit, _From, State = #state{limit = Limit}) ->
{reply, Limit, State};
?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]),
State0 = #state{dir = Dir,
alarmed = false,
enabled = true,
limit = Limit,
retries = Retries,
interval = Interval},
State1 = set_min_check_interval(?DEFAULT_MIN_DISK_CHECK_INTERVAL, State0),
State2 = set_max_check_interval(?DEFAULT_MAX_DISK_CHECK_INTERVAL, State1),
{ok, enable(State2)}.
handle_call({set_disk_free_limit, _}, _From, #state{enabled = false} = State) ->
rabbit_log:info("Cannot set disk free limit: "
@ -146,20 +136,14 @@ handle_call({set_disk_free_limit, _}, _From, #state{enabled = false} = State) ->
handle_call({set_disk_free_limit, Limit}, _From, State) ->
{reply, ok, set_disk_limits(State, Limit)};
handle_call(get_min_check_interval, _From, State) ->
{reply, State#state.min_interval, State};
handle_call(get_max_check_interval, _From, State) ->
{reply, State#state.max_interval, State};
handle_call({set_min_check_interval, MinInterval}, _From, State) ->
{reply, ok, State#state{min_interval = MinInterval}};
{reply, ok, set_min_check_interval(MinInterval, State)};
handle_call({set_max_check_interval, MaxInterval}, _From, State) ->
{reply, ok, State#state{max_interval = MaxInterval}};
handle_call(get_disk_free, _From, State = #state { actual = Actual }) ->
{reply, Actual, State};
{reply, ok, set_max_check_interval(MaxInterval, State)};
handle_call({set_enabled, _Enabled = true}, _From, State) ->
start_timer(set_disk_limits(State, State#state.limit)),
@ -194,14 +178,36 @@ code_change(_OldVsn, State, _Extra) ->
%% Server Internals
%%----------------------------------------------------------------------------
safe_ets_lookup(Key, Default) ->
try
case ets:lookup(?ETS_NAME, Key) of
[{Key, Value}] ->
Value;
[] ->
Default
end
catch
error:badarg ->
Default
end.
% the partition / drive containing this directory will be monitored
dir() -> rabbit_mnesia:dir().
set_min_check_interval(MinInterval, State) ->
ets:insert(?ETS_NAME, {min_check_interval, MinInterval}),
State#state{min_interval = MinInterval}.
set_max_check_interval(MaxInterval, State) ->
ets:insert(?ETS_NAME, {max_check_interval, MaxInterval}),
State#state{max_interval = MaxInterval}.
set_disk_limits(State, Limit0) ->
Limit = interpret_limit(Limit0),
State1 = State#state { limit = Limit },
rabbit_log:info("Disk free limit set to ~pMB",
[trunc(Limit / 1000000)]),
ets:insert(?ETS_NAME, {disk_free_limit, Limit}),
internal_update(State1).
internal_update(State = #state { limit = Limit,
@ -219,7 +225,8 @@ internal_update(State = #state { limit = Limit,
_ ->
ok
end,
State #state {alarmed = NewAlarmed, actual = CurrentFree}.
ets:insert(?ETS_NAME, {disk_free, CurrentFree}),
State#state{alarmed = NewAlarmed, actual = CurrentFree}.
get_disk_free(Dir) ->
get_disk_free(Dir, os:type()).
@ -227,11 +234,89 @@ get_disk_free(Dir) ->
get_disk_free(Dir, {unix, Sun})
when Sun =:= sunos; Sun =:= sunos4; Sun =:= solaris ->
Df = os:find_executable("df"),
parse_free_unix(rabbit_misc:os_cmd(Df ++ " -k " ++ Dir));
parse_free_unix(run_cmd(Df ++ " -k " ++ Dir));
get_disk_free(Dir, {unix, _}) ->
Df = os:find_executable("df"),
parse_free_unix(rabbit_misc:os_cmd(Df ++ " -kP " ++ Dir));
parse_free_unix(run_cmd(Df ++ " -kP " ++ Dir));
get_disk_free(Dir, {win32, _}) ->
% Dir:
% "c:/Users/username/AppData/Roaming/RabbitMQ/db/rabbit2@username-z01-mnesia"
case win32_get_drive_letter(Dir) of
error ->
rabbit_log:warning("Expected the mnesia directory absolute "
"path to start with a drive letter like "
"'C:'. The path is: '~p'", [Dir]),
case win32_get_disk_free_dir(Dir) of
{ok, Free} ->
Free;
_ -> exit(could_not_determine_disk_free)
end;
DriveLetter ->
case win32_get_disk_free_fsutil(DriveLetter) of
{ok, Free0} -> Free0;
error ->
case win32_get_disk_free_pwsh(DriveLetter) of
{ok, Free1} -> Free1;
_ -> exit(could_not_determine_disk_free)
end
end
end.
parse_free_unix(Str) ->
case string:tokens(Str, "\n") of
[_, S | _] -> case string:tokens(S, " \t") of
[_, _, _, Free | _] -> list_to_integer(Free) * 1024;
_ -> exit({unparseable, Str})
end;
_ -> exit({unparseable, Str})
end.
win32_get_drive_letter([DriveLetter, $:, $/ | _]) when
(DriveLetter >= $a andalso DriveLetter =< $z) orelse
(DriveLetter >= $A andalso DriveLetter =< $Z) ->
DriveLetter;
win32_get_drive_letter(_) ->
error.
win32_get_disk_free_fsutil(DriveLetter) when
(DriveLetter >= $a andalso DriveLetter =< $z) orelse
(DriveLetter >= $A andalso DriveLetter =< $Z) ->
% DriveLetter $c
FsutilCmd = "fsutil.exe volume diskfree " ++ [DriveLetter] ++ ":",
% C:\windows\system32>fsutil volume diskfree c:
% Total free bytes : 812,733,878,272 (756.9 GB)
% Total bytes : 1,013,310,287,872 (943.7 GB)
% Total quota free bytes : 812,733,878,272 (756.9 GB)
case run_cmd(FsutilCmd) of
{error, timeout} ->
error;
FsutilResult ->
case string:slice(FsutilResult, 0, 5) of
"Error" ->
error;
"Total" ->
FirstLine = hd(string:tokens(FsutilResult, "\r\n")),
{match, [FreeStr]} = re:run(FirstLine, "(\\d+,?)+", [{capture, first, list}]),
{ok, list_to_integer(lists:flatten(string:tokens(FreeStr, ",")))}
end
end.
win32_get_disk_free_pwsh(DriveLetter) when
(DriveLetter >= $a andalso DriveLetter =< $z) orelse
(DriveLetter >= $A andalso DriveLetter =< $Z) ->
% DriveLetter $c
PoshCmd = "powershell.exe -NoLogo -NoProfile -NonInteractive -Command (Get-PSDrive " ++ [DriveLetter] ++ ").Free",
case run_cmd(PoshCmd) of
{error, timeout} ->
error;
PoshResultStr ->
% Note: remove \r\n
PoshResult = string:slice(PoshResultStr, 0, length(PoshResultStr) - 2),
{ok, list_to_integer(PoshResult)}
end.
win32_get_disk_free_dir(Dir) ->
%% On Windows, the Win32 API enforces a limit of 260 characters
%% (MAX_PATH). If we call `dir` with a path longer than that, it
%% fails with "File not found". Starting with Windows 10 version
@ -253,22 +338,11 @@ get_disk_free(Dir, {win32, _}) ->
%% See the following page to learn more about this:
%% https://ss64.com/nt/syntax-filenames.html
RawDir = "\\\\?\\" ++ string:replace(Dir, "/", "\\", all),
parse_free_win32(rabbit_misc:os_cmd("dir /-C /W \"" ++ RawDir ++ "\"")).
parse_free_unix(Str) ->
case string:tokens(Str, "\n") of
[_, S | _] -> case string:tokens(S, " \t") of
[_, _, _, Free | _] -> list_to_integer(Free) * 1024;
_ -> exit({unparseable, Str})
end;
_ -> exit({unparseable, Str})
end.
parse_free_win32(CommandResult) ->
CommandResult = run_cmd("dir /-C /W \"" ++ RawDir ++ "\""),
LastLine = lists:last(string:tokens(CommandResult, "\r\n")),
{match, [Free]} = re:run(lists:reverse(LastLine), "(\\d+)",
[{capture, all_but_first, list}]),
list_to_integer(lists:reverse(Free)).
{ok, list_to_integer(lists:reverse(Free))}.
interpret_limit({mem_relative, Relative})
when is_number(Relative) ->
@ -318,3 +392,20 @@ enable(#state{dir = Dir, interval = Interval, limit = Limit, retries = Retries}
erlang:send_after(Interval, self(), try_enable),
State#state{enabled = false}
end.
run_cmd(Cmd) ->
Pid = self(),
Ref = make_ref(),
CmdFun = fun() ->
CmdResult = rabbit_misc:os_cmd(Cmd),
Pid ! {Pid, Ref, CmdResult}
end,
CmdPid = spawn(CmdFun),
receive
{Pid, Ref, CmdResult} ->
CmdResult
after 5000 ->
exit(CmdPid, kill),
rabbit_log:error("Command timed out: '~s'", [Cmd]),
{error, timeout}
end.

View File

@ -40,7 +40,7 @@ is_file(File) ->
is_dir(Dir) -> is_dir_internal(read_file_info(Dir)).
is_dir_no_handle(Dir) -> is_dir_internal(prim_file:read_file_info(Dir)).
is_dir_no_handle(Dir) -> is_dir_internal(file:read_file_info(Dir, [raw])).
is_dir_internal({ok, #file_info{type=directory}}) -> true;
is_dir_internal(_) -> false.
@ -83,14 +83,23 @@ wildcard(Pattern, Dir) ->
list_dir(Dir) -> with_handle(fun () -> prim_file:list_dir(Dir) end).
read_file_info(File) ->
with_handle(fun () -> prim_file:read_file_info(File) end).
with_handle(fun () -> file:read_file_info(File, [raw]) end).
-spec read_term_file
(file:filename()) -> {'ok', [any()]} | rabbit_types:error(any()).
read_term_file(File) ->
try
{ok, Data} = with_handle(fun () -> prim_file:read_file(File) end),
F = fun() ->
{ok, FInfo} = file:read_file_info(File, [raw]),
{ok, Fd} = file:open(File, [read, raw, binary]),
try
file:read(Fd, FInfo#file_info.size)
after
file:close(Fd)
end
end,
{ok, Data} = with_handle(F),
{ok, Tokens, _} = erl_scan:string(binary_to_list(Data)),
TokenGroups = group_tokens(Tokens),
{ok, [begin

View File

@ -64,7 +64,11 @@ node_health_check(rabbit_node_monitor) ->
end;
node_health_check(alarms) ->
case proplists:get_value(alarms, rabbit:status()) of
% Note:
% Removed call to rabbit:status/0 here due to a memory leak on win32,
% plus it uses an excessive amount of resources
% Alternative to https://github.com/rabbitmq/rabbitmq-server/pull/3893
case rabbit:alarms() of
[] ->
ok;
Alarms ->

View File

@ -78,6 +78,8 @@ handle_info(tick, #state{timeout = Timeout} = State) ->
%% down `rabbit_sup` and the whole `rabbit` app.
[]
end,
rabbit_core_metrics:queue_stats(QName, Infos),
rabbit_event:notify(queue_stats, Infos ++ [{name, QName},
{messages, COffs},

View File

@ -176,4 +176,6 @@ merge_policy_value(<<"max-length-bytes">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"max-in-memory-length">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"max-in-memory-bytes">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal).
merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal);
%% use operator policy value for booleans
merge_policy_value(_Key, Val, OpVal) when is_boolean(Val) andalso is_boolean(OpVal) -> OpVal.

View File

@ -846,18 +846,22 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
%% This can happen during recovery
%% we need to re-initialise the queue record
%% if the stream id is a match
[Q] = mnesia:dirty_read(rabbit_durable_queue, QName),
case amqqueue:get_type_state(Q) of
#{name := S} when S == StreamId ->
rabbit_log:debug("~s: initializing queue record for stream id ~s",
[?MODULE, StreamId]),
_ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
case mnesia:dirty_read(rabbit_durable_queue, QName) of
[] ->
%% queue not found at all, it must have been deleted
ok;
_ ->
ok
end,
send_self_command({mnesia_updated, StreamId, Args});
[Q] ->
case amqqueue:get_type_state(Q) of
#{name := S} when S == StreamId ->
rabbit_log:debug("~s: initializing queue record for stream id ~s",
[?MODULE, StreamId]),
_ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
ok;
_ ->
ok
end,
send_self_command({mnesia_updated, StreamId, Args})
end;
_ ->
send_self_command({mnesia_updated, StreamId, Args})
catch _:E ->

View File

@ -46,7 +46,10 @@ groups() ->
import_case13,
import_case14,
import_case15,
import_case16
import_case16,
import_case17,
import_case18,
import_case19
]},
{boot_time_import_using_classic_source, [], [
@ -236,6 +239,36 @@ import_case16(Config) ->
{skip, "Should not run in mixed version environments"}
end.
import_case17(Config) -> import_invalid_file_case(Config, "failing_case17").
import_case18(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
false ->
case rabbit_ct_broker_helpers:enable_feature_flag(Config, user_limits) of
ok ->
import_file_case(Config, "case18"),
User = <<"limited_guest">>,
UserIsImported =
fun () ->
case user_lookup(Config, User) of
{error, not_found} -> false;
_ -> true
end
end,
rabbit_ct_helpers:await_condition(UserIsImported, 20000),
{ok, UserRec} = user_lookup(Config, User),
?assertEqual(#{<<"max-connections">> => 2}, internal_user:get_limits(UserRec)),
ok;
Skip ->
Skip
end;
_ ->
%% skip the test in mixed version mode
{skip, "Should not run in mixed version environments"}
end.
import_case19(Config) -> import_invalid_file_case(Config, "failing_case19").
export_import_round_trip_case1(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
false ->
@ -382,3 +415,6 @@ queue_lookup(Config, VHost, Name) ->
vhost_lookup(Config, VHost) ->
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_vhost, lookup, [VHost]).
user_lookup(Config, User) ->
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, lookup_user, [User]).

View File

@ -0,0 +1,46 @@
{
"bindings": [],
"exchanges": [],
"global_parameters": [
{
"name": "cluster_name",
"value": "rabbitmq@localhost"
}
],
"parameters": [],
"permissions": [
{
"configure": ".*",
"read": ".*",
"user": "guest",
"vhost": "/",
"write": ".*"
}
],
"policies": [],
"queues": [],
"rabbit_version": "3.9.1",
"rabbitmq_version": "3.9.1",
"topic_permissions": [],
"users": [
{
"hashing_algorithm": "rabbit_password_hashing_sha256",
"limits": {"max-connections" : 2},
"name": "limited_guest",
"password_hash": "wS4AT3B4Z5RpWlFn1FA30osf2C75D7WA3gem591ACDZ6saO6",
"tags": [
"administrator"
]
}
],
"vhosts": [
{
"limits": [],
"name": "/"
},
{
"limits": [],
"name": "tagged"
}
]
}

View File

@ -0,0 +1,19 @@
{
"vhosts": [
{
"name": "\/"
}
],
"policies": [
{
"vhost": "\/",
"pattern": "^project-nd-ns-",
"apply-to": "queues",
"definition": {
"expires": 120000,
"max-length": 10000
},
"priority": 1
}
]
}

View File

@ -0,0 +1,46 @@
{
"bindings": [],
"exchanges": [],
"global_parameters": [
{
"name": "cluster_name",
"value": "rabbitmq@localhost"
}
],
"parameters": [],
"permissions": [
{
"configure": ".*",
"read": ".*",
"user": "guest",
"vhost": "/",
"write": ".*"
}
],
"policies": [],
"queues": [],
"rabbit_version": "3.9.1",
"rabbitmq_version": "3.9.1",
"topic_permissions": [],
"users": [
{
"hashing_algorithm": "rabbit_password_hashing_sha256",
"limits": {"max-connections" : "twomincepies"},
"name": "limited_guest",
"password_hash": "wS4AT3B4Z5RpWlFn1FA30osf2C75D7WA3gem591ACDZ6saO6",
"tags": [
"administrator"
]
}
],
"vhosts": [
{
"limits": [],
"name": "/"
},
{
"limits": [],
"name": "tagged"
}
]
}

View File

@ -29,6 +29,7 @@ groups() ->
confirm_nowait,
confirm_ack,
confirm_acks,
confirm_after_mandatory_bug,
confirm_mandatory_unroutable,
confirm_unroutable_message],
[
@ -187,6 +188,17 @@ confirm_acks(Config) ->
publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>, <<"msg4">>]),
receive_many(lists:seq(1, 4)).
confirm_after_mandatory_bug(Config) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
QName = ?config(queue_name, Config),
declare_queue(Ch, Config, QName),
ok = amqp_channel:call(Ch, #'basic.publish'{routing_key = QName,
mandatory = true}, #amqp_msg{payload = <<"msg1">>}),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
publish(Ch, QName, [<<"msg2">>]),
true = amqp_channel:wait_for_confirms(Ch, 1),
ok.
%% For unroutable messages, the broker will issue a confirm once the exchange verifies a message
%% won't route to any queue (returns an empty list of queues).
%% If the message is also published as mandatory, the basic.return is sent to the client before

View File

@ -11,7 +11,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include("src/rabbit_fifo.hrl").
-include_lib("rabbit/src/rabbit_fifo.hrl").
%%%===================================================================
%%% Common Test callbacks

View File

@ -67,6 +67,12 @@ set_disk_free_limit_command(Config) ->
?MODULE, set_disk_free_limit_command1, [Config]).
set_disk_free_limit_command1(_Config) ->
F = fun () ->
DiskFree = rabbit_disk_monitor:get_disk_free(),
DiskFree =/= unknown
end,
rabbit_ct_helpers:await_condition(F),
%% Use an integer
rabbit_disk_monitor:set_disk_free_limit({mem_relative, 1}),
disk_free_limit_to_total_memory_ratio_is(1),
@ -84,7 +90,8 @@ set_disk_free_limit_command1(_Config) ->
passed.
disk_free_limit_to_total_memory_ratio_is(MemRatio) ->
DiskFreeLimit = rabbit_disk_monitor:get_disk_free_limit(),
ExpectedLimit = MemRatio * vm_memory_monitor:get_total_memory(),
% Total memory is unstable, so checking order
true = ExpectedLimit/rabbit_disk_monitor:get_disk_free_limit() < 1.2,
true = ExpectedLimit/rabbit_disk_monitor:get_disk_free_limit() > 0.98.
true = ExpectedLimit/DiskFreeLimit < 1.2,
true = ExpectedLimit/DiskFreeLimit > 0.98.

View File

@ -88,7 +88,7 @@ disk_monitor_enable1(_Config) ->
application:set_env(rabbit, disk_monitor_failure_retry_interval, 100),
ok = rabbit_sup:stop_child(rabbit_disk_monitor_sup),
ok = rabbit_sup:start_delayed_restartable_child(rabbit_disk_monitor, [1000]),
undefined = rabbit_disk_monitor:get_disk_free(),
unknown = rabbit_disk_monitor:get_disk_free(),
Cmd = case os:type() of
{win32, _} -> " Le volume dans le lecteur C na pas de nom.\n"
" Le numéro de série du volume est 707D-5BDC\n"

View File

@ -21,7 +21,8 @@ all() ->
groups() ->
[
{parallel_tests, [parallel], [
merge_operator_policy_definitions
merge_operator_policy_definitions,
conflict_resolution_for_booleans
]}
].
@ -102,6 +103,54 @@ merge_operator_policy_definitions(_Config) ->
[{definition, [
{<<"message-ttl">>, 3000}
]}])
),
).
passed.
conflict_resolution_for_booleans(_Config) ->
?assertEqual(
[
{<<"remote-dc-replicate">>, true}
],
rabbit_policy:merge_operator_definitions(
#{definition => #{
<<"remote-dc-replicate">> => true
}},
[{definition, [
{<<"remote-dc-replicate">>, true}
]}])),
?assertEqual(
[
{<<"remote-dc-replicate">>, false}
],
rabbit_policy:merge_operator_definitions(
#{definition => #{
<<"remote-dc-replicate">> => false
}},
[{definition, [
{<<"remote-dc-replicate">>, false}
]}])),
?assertEqual(
[
{<<"remote-dc-replicate">>, true}
],
rabbit_policy:merge_operator_definitions(
#{definition => #{
<<"remote-dc-replicate">> => false
}},
[{definition, [
{<<"remote-dc-replicate">>, true}
]}])),
?assertEqual(
[
{<<"remote-dc-replicate">>, false}
],
rabbit_policy:merge_operator_definitions(
#{definition => #{
<<"remote-dc-replicate">> => true
}},
[{definition, [
{<<"remote-dc-replicate">>, false}
]}])).

View File

@ -139,6 +139,46 @@ In that case, the configuration will look like this:
NOTE: `jwks_url` takes precedence over `signing_keys` if both are provided.
### Variables Configurable in rabbitmq.conf
| Key | Documentation
|------------------------------------------|-----------
| `auth_oauth2.resource_server_id` | [The Resource Server ID](#resource-server-id-and-scope-prefixes)
| `auth_oauth2.additional_scopes_key` | Configure the plugin to also look in other fields (maps to `additional_rabbitmq_scopes` in the old format).
| `auth_oauth2.default_key` | ID of the default signing key.
| `auth_oauth2.signing_keys` | Paths to signing key files.
| `auth_oauth2.jwks_url` | The URL of key server. According to the [JWT Specification](https://datatracker.ietf.org/doc/html/rfc7515#section-4.1.2) key server URL must be https.
| `auth_oauth2.https.cacertfile` | Path to a file containing PEM-encoded CA certificates. The CA certificates are used during key server [peer verification](https://rabbitmq.com/ssl.html#peer-verification).
| `auth_oauth2.https.depth` | The maximum number of non-self-issued intermediate certificates that may follow the peer certificate in a valid [certification path](https://rabbitmq.com/ssl.html#peer-verification-depth). Default is 10.
| `auth_oauth2.https.peer_verification` | Should [peer verification](https://rabbitmq.com/ssl.html#peer-verification) be enabled. Available values: `verify_none`, `verify_peer`. Default is `verify_none`. It is recommended to configure `verify_peer`. Peer verification requires a certain amount of setup and is more secure.
| `auth_oauth2.https.fail_if_no_peer_cert` | Used together with `auth_oauth2.https.peer_verification = verify_peer`. When set to `true`, TLS connection will be rejected if client fails to provide a certificate. Default is `false`.
| `auth_oauth2.https.hostname_verification`| Enable wildcard-aware hostname verification for key server. Available values: `wildcard`, `none`. Default is `none`.
| `auth_oauth2.algorithms` | Restrict [the usable algorithms](https://github.com/potatosalad/erlang-jose#algorithm-support).
For example:
Configure with key files
```
auth_oauth2.resource_server_id = new_resource_server_id
auth_oauth2.additional_scopes_key = my_custom_scope_key
auth_oauth2.default_key = id1
auth_oauth2.signing_keys.id1 = test/config_schema_SUITE_data/certs/key.pem
auth_oauth2.signing_keys.id2 = test/config_schema_SUITE_data/certs/cert.pem
auth_oauth2.algorithms.1 = HS256
auth_oauth2.algorithms.2 = RS256
```
Configure with key server
```
auth_oauth2.resource_server_id = new_resource_server_id
auth_oauth2.jwks_url = https://my-jwt-issuer/jwks.json
auth_oauth2.https.cacertfile = test/config_schema_SUITE_data/certs/cacert.pem
auth_oauth2.https.peer_verification = verify_peer
auth_oauth2.https.depth = 5
auth_oauth2.https.fail_if_no_peer_cert = true
auth_oauth2.https.hostname_verification = wildcard
auth_oauth2.algorithms.1 = HS256
auth_oauth2.algorithms.2 = RS256
```
### Resource Server ID and Scope Prefixes
OAuth 2.0 (and thus UAA-provided) tokens use scopes to communicate what set of permissions particular

View File

@ -77,3 +77,52 @@
end, Settings),
maps:from_list(SigningKeys)
end}.
{mapping,
"auth_oauth2.jwks_url",
"rabbitmq_auth_backend_oauth2.key_config.jwks_url",
[{datatype, string}, {validators, ["uri", "https_uri"]}]}.
{mapping,
"auth_oauth2.https.peer_verification",
"rabbitmq_auth_backend_oauth2.key_config.peer_verification",
[{datatype, {enum, [verify_peer, verify_none]}}]}.
{mapping,
"auth_oauth2.https.cacertfile",
"rabbitmq_auth_backend_oauth2.key_config.cacertfile",
[{datatype, file}, {validators, ["file_accessible"]}]}.
{mapping,
"auth_oauth2.https.depth",
"rabbitmq_auth_backend_oauth2.key_config.depth",
[{datatype, integer}]}.
{mapping,
"auth_oauth2.https.hostname_verification",
"rabbitmq_auth_backend_oauth2.key_config.hostname_verification",
[{datatype, {enum, [wildcard, none]}}]}.
{mapping,
"auth_oauth2.https.crl_check",
"rabbitmq_auth_backend_oauth2.key_config.crl_check",
[{datatype, {enum, [true, false, peer, best_effort]}}]}.
{mapping,
"auth_oauth2.https.fail_if_no_peer_cert",
"rabbitmq_auth_backend_oauth2.key_config.fail_if_no_peer_cert",
[{datatype, {enum, [true, false]}}]}.
{validator, "https_uri", "According to the JWT Specification, Key Server URL must be https.",
fun(Uri) -> string:nth_lexeme(Uri, 1, "://") == "https" end}.
{mapping,
"auth_oauth2.algorithms.$algorithm",
"rabbitmq_auth_backend_oauth2.key_config.algorithms",
[{datatype, string}]}.
{translation, "rabbitmq_auth_backend_oauth2.key_config.algorithms",
fun(Conf) ->
Settings = cuttlefish_variable:filter_by_prefix("auth_oauth2.algorithms", Conf),
[list_to_binary(V) || {_, V} <- Settings]
end}.

View File

@ -0,0 +1,27 @@
-module(uaa_jwks).
-export([get/1]).
-spec get(string() | binary()) -> {ok, term()} | {error, term()}.
get(JwksUrl) ->
httpc:request(get, {JwksUrl, []}, [{ssl, ssl_options()}, {timeout, 60000}], []).
-spec ssl_options() -> list().
ssl_options() ->
UaaEnv = application:get_env(rabbitmq_auth_backend_oauth2, key_config, []),
PeerVerification = proplists:get_value(peer_verification, UaaEnv, verify_none),
CaCertFile = proplists:get_value(cacertfile, UaaEnv),
Depth = proplists:get_value(depth, UaaEnv, 10),
FailIfNoPeerCert = proplists:get_value(fail_if_no_peer_cert, UaaEnv, false),
CrlCheck = proplists:get_value(crl_check, UaaEnv, false),
SslOpts0 = [{verify, PeerVerification},
{cacertfile, CaCertFile},
{depth, Depth},
{fail_if_no_peer_cert, FailIfNoPeerCert},
{crl_check, CrlCheck},
{crl_cache, {ssl_crl_cache, {internal, [{http, 10000}]}}}],
case proplists:get_value(hostname_verification, UaaEnv, none) of
wildcard ->
[{customize_hostname_check, [{match_fun, public_key:pkix_verify_hostname_match_fun(https)}]} | SslOpts0];
none ->
SslOpts0
end.

View File

@ -58,7 +58,7 @@ update_jwks_signing_keys() ->
undefined ->
{error, no_jwks_url};
JwksUrl ->
case httpc:request(JwksUrl) of
case uaa_jwks:get(JwksUrl) of
{ok, {_, _, JwksBody}} ->
KeyList = maps:get(<<"keys">>, jose:decode(erlang:iolist_to_binary(JwksBody)), []),
Keys = maps:from_list(lists:map(fun(Key) -> {maps:get(<<"kid">>, Key, undefined), {json, Key}} end, KeyList)),

View File

@ -24,7 +24,15 @@ decode(Token) ->
end.
decode_and_verify(Jwk, Token) ->
case jose_jwt:verify(Jwk, Token) of
UaaEnv = application:get_env(rabbitmq_auth_backend_oauth2, key_config, []),
Verify =
case proplists:get_value(algorithms, UaaEnv) of
undefined ->
jose_jwt:verify(Jwk, Token);
Algs ->
jose_jwt:verify_strict(Jwk, Algs, Token)
end,
case Verify of
{true, #jose_jwt{fields = Fields}, _} -> {true, Fields};
{false, #jose_jwt{fields = Fields}, _} -> {false, Fields}
end.

View File

@ -4,7 +4,16 @@
auth_oauth2.additional_scopes_key = my_custom_scope_key
auth_oauth2.default_key = id1
auth_oauth2.signing_keys.id1 = test/config_schema_SUITE_data/certs/key.pem
auth_oauth2.signing_keys.id2 = test/config_schema_SUITE_data/certs/cert.pem",
auth_oauth2.signing_keys.id2 = test/config_schema_SUITE_data/certs/cert.pem
auth_oauth2.jwks_url = https://my-jwt-issuer/jwks.json
auth_oauth2.https.cacertfile = test/config_schema_SUITE_data/certs/cacert.pem
auth_oauth2.https.peer_verification = verify_none
auth_oauth2.https.depth = 5
auth_oauth2.https.fail_if_no_peer_cert = false
auth_oauth2.https.hostname_verification = wildcard
auth_oauth2.https.crl_check = true
auth_oauth2.algorithms.1 = HS256
auth_oauth2.algorithms.2 = RS256",
[
{rabbitmq_auth_backend_oauth2, [
{resource_server_id,<<"new_resource_server_id">>},
@ -16,7 +25,15 @@
<<"id1">> => {pem, <<"I'm not a certificate">>},
<<"id2">> => {pem, <<"I'm not a certificate">>}
}
}
},
{jwks_url, "https://my-jwt-issuer/jwks.json"},
{cacertfile, "test/config_schema_SUITE_data/certs/cacert.pem"},
{peer_verification, verify_none},
{depth, 5},
{fail_if_no_peer_cert, false},
{hostname_verification, wildcard},
{crl_check, true},
{algorithms, [<<"HS256">>, <<"RS256">>]}
]
}
]}

View File

@ -21,7 +21,9 @@
all() ->
[
{group, happy_path},
{group, unhappy_path}
{group, unhappy_path},
{group, unvalidated_jwks_server},
{group, no_peer_verification}
].
groups() ->
@ -34,6 +36,7 @@ groups() ->
test_successful_connection_with_complex_claim_as_a_list,
test_successful_connection_with_complex_claim_as_a_binary,
test_successful_connection_with_keycloak_token,
test_successful_connection_with_algorithm_restriction,
test_successful_token_refresh
]},
{unhappy_path, [], [
@ -41,9 +44,12 @@ groups() ->
test_failed_connection_with_a_non_token,
test_failed_connection_with_a_token_with_insufficient_vhost_permission,
test_failed_connection_with_a_token_with_insufficient_resource_permission,
test_failed_connection_with_algorithm_restriction,
test_failed_token_refresh_case1,
test_failed_token_refresh_case2
]}
]},
{unvalidated_jwks_server, [], [test_failed_connection_with_unvalidated_jwks_server]},
{no_peer_verification, [], [{group, happy_path}, {group, unhappy_path}]}
].
%%
@ -69,23 +75,35 @@ end_per_suite(Config) ->
fun stop_jwks_server/1
] ++ rabbit_ct_broker_helpers:teardown_steps()).
init_per_group(no_peer_verification, Config) ->
add_vhosts(Config),
KeyConfig = rabbit_ct_helpers:set_config(?config(key_config, Config), [{jwks_url, ?config(non_strict_jwks_url, Config)}, {peer_verification, verify_none}]),
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_auth_backend_oauth2, key_config, KeyConfig]),
rabbit_ct_helpers:set_config(Config, {key_config, KeyConfig});
init_per_group(_Group, Config) ->
%% The broker is managed by {init,end}_per_testcase().
lists:foreach(fun(Value) ->
rabbit_ct_broker_helpers:add_vhost(Config, Value)
end,
[<<"vhost1">>, <<"vhost2">>, <<"vhost3">>, <<"vhost4">>]),
add_vhosts(Config),
Config.
end_per_group(no_peer_verification, Config) ->
delete_vhosts(Config),
KeyConfig = rabbit_ct_helpers:set_config(?config(key_config, Config), [{jwks_url, ?config(strict_jwks_url, Config)}, {peer_verification, verify_peer}]),
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_auth_backend_oauth2, key_config, KeyConfig]),
rabbit_ct_helpers:set_config(Config, {key_config, KeyConfig});
end_per_group(_Group, Config) ->
%% The broker is managed by {init,end}_per_testcase().
lists:foreach(fun(Value) ->
rabbit_ct_broker_helpers:delete_vhost(Config, Value)
end,
[<<"vhost1">>, <<"vhost2">>, <<"vhost3">>, <<"vhost4">>]),
delete_vhosts(Config),
Config.
add_vhosts(Config) ->
%% The broker is managed by {init,end}_per_testcase().
lists:foreach(fun(Value) -> rabbit_ct_broker_helpers:add_vhost(Config, Value) end,
[<<"vhost1">>, <<"vhost2">>, <<"vhost3">>, <<"vhost4">>]).
delete_vhosts(Config) ->
%% The broker is managed by {init,end}_per_testcase().
lists:foreach(fun(Value) -> rabbit_ct_broker_helpers:delete_vhost(Config, Value) end,
[<<"vhost1">>, <<"vhost2">>, <<"vhost3">>, <<"vhost4">>]).
init_per_testcase(Testcase, Config) when Testcase =:= test_successful_connection_with_a_full_permission_token_and_explicitly_configured_vhost orelse
Testcase =:= test_successful_token_refresh ->
@ -107,6 +125,24 @@ init_per_testcase(Testcase, Config) when Testcase =:= test_successful_connection
rabbit_ct_helpers:testcase_started(Config, Testcase),
Config;
init_per_testcase(Testcase, Config) when Testcase =:= test_successful_connection_with_algorithm_restriction ->
KeyConfig = ?config(key_config, Config),
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_auth_backend_oauth2, key_config, [{algorithms, [<<"HS256">>]} | KeyConfig]]),
rabbit_ct_helpers:testcase_started(Config, Testcase),
Config;
init_per_testcase(Testcase, Config) when Testcase =:= test_failed_connection_with_algorithm_restriction ->
KeyConfig = ?config(key_config, Config),
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_auth_backend_oauth2, key_config, [{algorithms, [<<"RS256">>]} | KeyConfig]]),
rabbit_ct_helpers:testcase_started(Config, Testcase),
Config;
init_per_testcase(Testcase, Config) when Testcase =:= test_failed_connection_with_unvalidated_jwks_server ->
KeyConfig = rabbit_ct_helpers:set_config(?config(key_config, Config), {jwks_url, ?config(non_strict_jwks_url, Config)}),
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_auth_backend_oauth2, key_config, KeyConfig]),
rabbit_ct_helpers:testcase_started(Config, Testcase),
Config;
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase),
Config.
@ -126,6 +162,14 @@ end_per_testcase(Testcase, Config) when Testcase =:= test_successful_connection_
rabbit_ct_helpers:testcase_started(Config, Testcase),
Config;
end_per_testcase(Testcase, Config) when Testcase =:= test_successful_connection_with_algorithm_restriction orelse
Testcase =:= test_failed_connection_with_algorithm_restriction orelse
Testcase =:= test_failed_connection_with_unvalidated_jwks_server ->
rabbit_ct_broker_helpers:delete_vhost(Config, <<"vhost1">>),
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_auth_backend_oauth2, key_config, ?config(key_config, Config)]),
rabbit_ct_helpers:testcase_finished(Config, Testcase),
Config;
end_per_testcase(Testcase, Config) ->
rabbit_ct_broker_helpers:delete_vhost(Config, <<"vhost1">>),
rabbit_ct_helpers:testcase_finished(Config, Testcase),
@ -143,13 +187,27 @@ start_jwks_server(Config) ->
%% Assume we don't have more than 100 ports allocated for tests
PortBase = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_ports_base),
JwksServerPort = PortBase + 100,
%% Both URLs direct to the same JWKS server
%% The NonStrictJwksUrl identity cannot be validated while StrictJwksUrl identity can be validated
NonStrictJwksUrl = "https://127.0.0.1:" ++ integer_to_list(JwksServerPort) ++ "/jwks",
StrictJwksUrl = "https://localhost:" ++ integer_to_list(JwksServerPort) ++ "/jwks",
ok = application:set_env(jwks_http, keys, [Jwk]),
{ok, _} = application:ensure_all_started(ssl),
{ok, _} = application:ensure_all_started(cowboy),
ok = jwks_http_app:start(JwksServerPort),
KeyConfig = [{jwks_url, "http://127.0.0.1:" ++ integer_to_list(JwksServerPort) ++ "/jwks"}],
CertsDir = ?config(rmq_certsdir, Config),
ok = jwks_http_app:start(JwksServerPort, CertsDir),
KeyConfig = [{jwks_url, StrictJwksUrl},
{peer_verification, verify_peer},
{cacertfile, filename:join([CertsDir, "testca", "cacert.pem"])}],
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,
[rabbitmq_auth_backend_oauth2, key_config, KeyConfig]),
rabbit_ct_helpers:set_config(Config, {fixture_jwk, Jwk}).
rabbit_ct_helpers:set_config(Config,
[{non_strict_jwks_url, NonStrictJwksUrl},
{strict_jwks_url, StrictJwksUrl},
{key_config, KeyConfig},
{fixture_jwk, Jwk}]).
stop_jwks_server(Config) ->
ok = jwks_http_app:stop(),
@ -305,7 +363,7 @@ test_successful_token_refresh(Config) ->
Conn = open_unmanaged_connection(Config, 0, <<"vhost1">>, <<"username">>, Token),
{ok, Ch} = amqp_connection:open_channel(Conn),
{_Algo, Token2} = generate_valid_token(Config, [<<"rabbitmq.configure:vhost1/*">>,
{_Algo2, Token2} = generate_valid_token(Config, [<<"rabbitmq.configure:vhost1/*">>,
<<"rabbitmq.write:vhost1/*">>,
<<"rabbitmq.read:vhost1/*">>]),
?UTIL_MOD:wait_for_token_to_expire(timer:seconds(Duration)),
@ -321,6 +379,13 @@ test_successful_token_refresh(Config) ->
amqp_channel:close(Ch2),
close_connection_and_channel(Conn, Ch).
test_successful_connection_with_algorithm_restriction(Config) ->
{_Algo, Token} = rabbit_ct_helpers:get_config(Config, fixture_jwt),
Conn = open_unmanaged_connection(Config, 0, <<"username">>, Token),
{ok, Ch} = amqp_connection:open_channel(Conn),
#'queue.declare_ok'{queue = _} =
amqp_channel:call(Ch, #'queue.declare'{exclusive = true}),
close_connection_and_channel(Conn, Ch).
test_failed_connection_with_expired_token(Config) ->
{_Algo, Token} = generate_expired_token(Config, [<<"rabbitmq.configure:vhost1/*">>,
@ -359,7 +424,7 @@ test_failed_token_refresh_case1(Config) ->
#'queue.declare_ok'{queue = _} =
amqp_channel:call(Ch, #'queue.declare'{exclusive = true}),
{_Algo, Token2} = generate_expired_token(Config, [<<"rabbitmq.configure:vhost4/*">>,
{_Algo2, Token2} = generate_expired_token(Config, [<<"rabbitmq.configure:vhost4/*">>,
<<"rabbitmq.write:vhost4/*">>,
<<"rabbitmq.read:vhost4/*">>]),
%% the error is communicated asynchronously via a connection-level error
@ -387,3 +452,13 @@ test_failed_token_refresh_case2(Config) ->
amqp_connection:open_channel(Conn)),
close_connection(Conn).
test_failed_connection_with_algorithm_restriction(Config) ->
{_Algo, Token} = rabbit_ct_helpers:get_config(Config, fixture_jwt),
?assertMatch({error, {auth_failure, _}},
open_unmanaged_connection(Config, 0, <<"username">>, Token)).
test_failed_connection_with_unvalidated_jwks_server(Config) ->
{_Algo, Token} = rabbit_ct_helpers:get_config(Config, fixture_jwt),
?assertMatch({error, {auth_failure, _}},
open_unmanaged_connection(Config, 0, <<"username">>, Token)).

View File

@ -1,8 +1,8 @@
-module(jwks_http_app).
-export([start/1, stop/0]).
-export([start/2, stop/0]).
start(Port) ->
start(Port, CertsDir) ->
Dispatch =
cowboy_router:compile(
[
@ -11,8 +11,10 @@ start(Port) ->
]}
]
),
{ok, _} = cowboy:start_clear(jwks_http_listener,
[{port, Port}],
{ok, _} = cowboy:start_tls(jwks_http_listener,
[{port, Port},
{certfile, filename:join([CertsDir, "server", "cert.pem"])},
{keyfile, filename:join([CertsDir, "server", "key.pem"])}],
#{env => #{dispatch => Dispatch}}),
ok.

View File

@ -73,7 +73,7 @@ expired_token_with_scopes(Scopes) ->
token_with_scopes_and_expiration(Scopes, os:system_time(seconds) - 10).
fixture_token_with_scopes(Scopes) ->
token_with_scopes_and_expiration(Scopes, os:system_time(seconds) + 10).
token_with_scopes_and_expiration(Scopes, os:system_time(seconds) + 30).
token_with_scopes_and_expiration(Scopes, Expiration) ->
%% expiration is a timestamp with precision in seconds

View File

@ -87,7 +87,9 @@ delete_super_stream(VirtualHost, Name, Username) ->
gen_server:call(?MODULE,
{delete_super_stream, VirtualHost, Name, Username}).
-spec lookup_leader(binary(), binary()) -> pid() | cluster_not_found.
-spec lookup_leader(binary(), binary()) ->
{ok, pid()} | {error, not_available} |
{error, not_found}.
lookup_leader(VirtualHost, Stream) ->
gen_server:call(?MODULE, {lookup_leader, VirtualHost, Stream}).
@ -294,20 +296,25 @@ handle_call({lookup_leader, VirtualHost, Stream}, _From, State) ->
LeaderPid = amqqueue:get_pid(Q),
case process_alive(LeaderPid) of
true ->
LeaderPid;
{ok, LeaderPid};
false ->
case leader_from_members(Q) of
{ok, Pid} ->
Pid;
{ok, Pid};
_ ->
cluster_not_found
{error, not_available}
end
end;
_ ->
cluster_not_found
{error, not_found}
end;
_ ->
cluster_not_found
{error, not_found} ->
case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
not_found ->
{error, not_found};
_ ->
{error, not_available}
end
end,
{reply, Res, State};
handle_call({lookup_local_member, VirtualHost, Stream}, _From,

View File

@ -1494,7 +1494,7 @@ handle_frame_post_auth(Transport,
of
{false, false} ->
case lookup_leader(Stream, Connection0) of
cluster_not_found ->
{error, not_found} ->
response(Transport,
Connection0,
declare_publisher,
@ -1504,6 +1504,16 @@ handle_frame_post_auth(Transport,
?STREAM_DOES_NOT_EXIST,
1),
{Connection0, State};
{error, not_available} ->
response(Transport,
Connection0,
declare_publisher,
CorrelationId,
?RESPONSE_CODE_STREAM_NOT_AVAILABLE),
rabbit_global_counters:increase_protocol_counter(stream,
?STREAM_NOT_AVAILABLE,
1),
{Connection0, State};
{ClusterLeader,
#stream_connection{publishers = Publishers0,
publisher_to_ids = RefIds0} =
@ -1960,9 +1970,9 @@ handle_frame_post_auth(_Transport,
of
ok ->
case lookup_leader(Stream, Connection) of
cluster_not_found ->
rabbit_log:warning("Could not find leader to store offset on ~p",
[Stream]),
{error, Error} ->
rabbit_log:warning("Could not find leader to store offset on ~p: ~p",
[Stream, Error]),
%% FIXME store offset is fire-and-forget, so no response even if error, change this?
{Connection, State};
{ClusterLeader, Connection1} ->
@ -1992,11 +2002,16 @@ handle_frame_post_auth(Transport,
of
ok ->
case lookup_leader(Stream, Connection0) of
cluster_not_found ->
{error, not_found} ->
rabbit_global_counters:increase_protocol_counter(stream,
?STREAM_DOES_NOT_EXIST,
1),
{?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0, Connection0};
{error, not_available} ->
rabbit_global_counters:increase_protocol_counter(stream,
?STREAM_NOT_AVAILABLE,
1),
{?RESPONSE_CODE_STREAM_NOT_AVAILABLE, 0, Connection0};
{LeaderPid, C} ->
{RC, O} =
case osiris:read_tracking(LeaderPid, Reference) of
@ -2532,9 +2547,9 @@ lookup_leader(Stream,
case maps:get(Stream, StreamLeaders, undefined) of
undefined ->
case lookup_leader_from_manager(VirtualHost, Stream) of
cluster_not_found ->
cluster_not_found;
LeaderPid ->
{error, Error} ->
{error, Error};
{ok, LeaderPid} ->
Connection1 =
maybe_monitor_stream(LeaderPid, Stream, Connection),
{LeaderPid,

View File

@ -27,9 +27,9 @@
<properties>
<stream-client.version>[0.5.0-SNAPSHOT,1.0-SNAPSHOT)</stream-client.version>
<junit.jupiter.version>5.8.1</junit.jupiter.version>
<junit.jupiter.version>5.8.2</junit.jupiter.version>
<assertj.version>3.21.0</assertj.version>
<logback.version>1.2.6</logback.version>
<logback.version>1.2.7</logback.version>
<maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
<spotless.version>2.2.0</spotless.version>

View File

@ -16,6 +16,9 @@
package com.rabbitmq.stream;
import static com.rabbitmq.stream.TestUtils.ResponseConditions.ko;
import static com.rabbitmq.stream.TestUtils.ResponseConditions.ok;
import static com.rabbitmq.stream.TestUtils.ResponseConditions.responseCode;
import static org.assertj.core.api.Assertions.assertThat;
import com.rabbitmq.stream.impl.Client;
@ -40,8 +43,7 @@ public class ClusterSizeTest {
String s = UUID.randomUUID().toString();
Response response =
client.create(s, Collections.singletonMap("initial-cluster-size", clusterSize));
assertThat(response.isOk()).isFalse();
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_PRECONDITION_FAILED);
assertThat(response).is(ko()).has(responseCode(Constants.RESPONSE_CODE_PRECONDITION_FAILED));
}
@ParameterizedTest
@ -53,7 +55,7 @@ public class ClusterSizeTest {
try {
Response response =
client.create(s, Collections.singletonMap("initial-cluster-size", requestedClusterSize));
assertThat(response.isOk()).isTrue();
assertThat(response).is(ok());
StreamMetadata metadata = client.metadata(s).get(s);
assertThat(metadata).isNotNull();
assertThat(metadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);

View File

@ -16,11 +16,16 @@
package com.rabbitmq.stream;
import static com.rabbitmq.stream.TestUtils.ResponseConditions.ok;
import static com.rabbitmq.stream.TestUtils.waitAtMost;
import static com.rabbitmq.stream.TestUtils.waitUntil;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import com.rabbitmq.stream.codec.WrapperMessageBuilder;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.impl.Client.ClientParameters;
import com.rabbitmq.stream.impl.Client.Response;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
@ -66,7 +71,7 @@ public class FailureTest {
Client.StreamMetadata streamMetadata = metadata.get(stream);
assertThat(streamMetadata).isNotNull();
TestUtils.waitUntil(() -> client.metadata(stream).get(stream).getReplicas().size() == 2);
waitUntil(() -> client.metadata(stream).get(stream).getReplicas().size() == 2);
streamMetadata = client.metadata(stream).get(stream);
assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1());
@ -107,7 +112,7 @@ public class FailureTest {
assertThat(metadataLatch.await(10, TimeUnit.SECONDS)).isTrue();
// wait until there's a new leader
TestUtils.waitAtMost(
waitAtMost(
Duration.ofSeconds(10),
() -> {
Client.StreamMetadata m = publisher.metadata(stream).get(stream);
@ -133,7 +138,7 @@ public class FailureTest {
}
// wait until all the replicas are there
TestUtils.waitAtMost(
waitAtMost(
Duration.ofSeconds(10),
() -> {
LOGGER.info("Getting metadata for {}", stream);
@ -164,7 +169,7 @@ public class FailureTest {
consumeLatch.countDown();
}));
TestUtils.waitAtMost(
waitAtMost(
Duration.ofSeconds(5),
() -> {
Client.Response response =
@ -219,7 +224,7 @@ public class FailureTest {
cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2()));
// wait until there's a new leader
try {
TestUtils.waitAtMost(
waitAtMost(
Duration.ofSeconds(5),
() -> {
Client.StreamMetadata m = locator.metadata(stream).get(stream);
@ -314,7 +319,7 @@ public class FailureTest {
Client metadataClient = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2()));
// wait until all the replicas are there
TestUtils.waitAtMost(
waitAtMost(
Duration.ofSeconds(5),
() -> {
Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
@ -350,7 +355,7 @@ public class FailureTest {
Client.Response response =
consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
assertThat(response.isOk()).isTrue();
assertThat(response).is(ok());
assertThat(consumedLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(generations).hasSize(2).contains(0L, 1L);
@ -372,8 +377,7 @@ public class FailureTest {
Client.StreamMetadata streamMetadata = metadata.get(stream);
assertThat(streamMetadata).isNotNull();
TestUtils.waitUntil(
() -> metadataClient.metadata(stream).get(stream).getReplicas().size() == 2);
waitUntil(() -> metadataClient.metadata(stream).get(stream).getReplicas().size() == 2);
metadata = metadataClient.metadata(stream);
streamMetadata = metadata.get(stream);
@ -497,7 +501,7 @@ public class FailureTest {
Client.Response response =
consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
assertThat(response.isOk()).isTrue();
assertThat(response).is(ok());
// let's publish for a bit of time
Thread.sleep(2000);
@ -521,7 +525,7 @@ public class FailureTest {
confirmedCount = confirmed.size();
// wait until all the replicas are there
TestUtils.waitAtMost(
waitAtMost(
Duration.ofSeconds(10),
() -> {
Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
@ -535,9 +539,9 @@ public class FailureTest {
keepPublishing.set(false);
assertThat(publishingLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(publishingLatch.await(10, TimeUnit.SECONDS)).isTrue();
TestUtils.waitAtMost(Duration.ofSeconds(5), () -> consumed.size() >= confirmed.size());
waitAtMost(Duration.ofSeconds(10), () -> consumed.size() >= confirmed.size());
assertThat(generations).hasSize(2).contains(0L, 1L);
assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size());
@ -551,4 +555,33 @@ public class FailureTest {
confirmedIds.forEach(confirmedId -> assertThat(consumedIds).contains(confirmedId));
}
@Test
void declarePublisherShouldNotReturnStreamDoesNotExistOnRestart() throws Exception {
try {
Host.rabbitmqctl("stop_app");
} finally {
Host.rabbitmqctl("start_app");
}
AtomicReference<Client> client = new AtomicReference<>();
waitUntil(
() -> {
try {
client.set(cf.get(new ClientParameters().port(TestUtils.streamPortNode1())));
} catch (Exception e) {
}
return client.get() != null;
});
Set<Short> responseCodes = ConcurrentHashMap.newKeySet();
waitUntil(
() -> {
Response response = client.get().declarePublisher((byte) 0, null, stream);
responseCodes.add(response.getResponseCode());
return response.isOk();
});
assertThat(responseCodes).doesNotContain(Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST);
}
}

View File

@ -16,6 +16,9 @@
package com.rabbitmq.stream;
import static com.rabbitmq.stream.TestUtils.ResponseConditions.ko;
import static com.rabbitmq.stream.TestUtils.ResponseConditions.ok;
import static com.rabbitmq.stream.TestUtils.ResponseConditions.responseCode;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
@ -47,8 +50,7 @@ public class LeaderLocatorTest {
Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
String s = UUID.randomUUID().toString();
Response response = client.create(s, Collections.singletonMap("queue-leader-locator", "foo"));
assertThat(response.isOk()).isFalse();
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_PRECONDITION_FAILED);
assertThat(response).is(ko()).has(responseCode(Constants.RESPONSE_CODE_PRECONDITION_FAILED));
}
@Test
@ -60,7 +62,7 @@ public class LeaderLocatorTest {
try {
Response response =
client.create(s, Collections.singletonMap("queue-leader-locator", "client-local"));
assertThat(response.isOk()).isTrue();
assertThat(response).is(ok());
StreamMetadata metadata = client.metadata(s).get(s);
assertThat(metadata).isNotNull();
assertThat(metadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
@ -136,7 +138,7 @@ public class LeaderLocatorTest {
Response response =
client.create(
s, Collections.singletonMap("queue-leader-locator", "least-leaders"));
assertThat(response.isOk()).isTrue();
assertThat(response).is(ok());
createdStreams.add(s);
});

View File

@ -16,11 +16,13 @@
package com.rabbitmq.stream;
import static com.rabbitmq.stream.TestUtils.ResponseConditions.ok;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.impl.Client.Response;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.lang.reflect.Field;
@ -30,6 +32,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BooleanSupplier;
import org.assertj.core.api.Condition;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.*;
@ -106,7 +109,7 @@ public class TestUtils {
.eventLoopGroup(eventLoopGroup(context))
.port(streamPortNode1()));
Client.Response response = client.create(stream);
assertThat(response.isOk()).isTrue();
assertThat(response).is(ok());
client.close();
store(context).put("testMethodStream", stream);
} catch (NoSuchFieldException e) {
@ -136,7 +139,7 @@ public class TestUtils {
.eventLoopGroup(eventLoopGroup(context))
.port(streamPortNode1()));
Client.Response response = client.delete(stream);
assertThat(response.isOk()).isTrue();
assertThat(response).is(ok());
client.close();
store(context).remove("testMethodStream");
} catch (NoSuchFieldException e) {
@ -197,4 +200,22 @@ public class TestUtils {
}
}
}
static class ResponseConditions {
static Condition<Response> ok() {
return new Condition<>(Response::isOk, "Response should be OK");
}
static Condition<Response> ko() {
return new Condition<>(response -> !response.isOk(), "Response should be OK");
}
static Condition<Response> responseCode(short expectedResponse) {
return new Condition<>(
response -> response.getResponseCode() == expectedResponse,
"response code %d",
expectedResponse);
}
}
}

View File

@ -16,7 +16,7 @@ all() ->
[{group, non_parallel_tests}].
groups() ->
[{non_parallel_tests, [], [manage_super_stream]}].
[{non_parallel_tests, [], [manage_super_stream, lookup_leader]}].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
@ -71,6 +71,17 @@ end_per_testcase(Testcase, Config) ->
%% Testcases.
%% -------------------------------------------------------------------
lookup_leader(Config) ->
Stream = <<"stream_manager_lookup_leader_stream">>,
?assertMatch({ok, _}, create_stream(Config, Stream)),
{ok, Pid} = lookup_leader(Config, Stream),
?assert(is_pid(Pid)),
?assertEqual({error, not_found}, lookup_leader(Config, <<"foo">>)),
?assertEqual({ok, deleted}, delete_stream(Config, Stream)).
manage_super_stream(Config) ->
% create super stream
?assertEqual(ok,
@ -140,6 +151,20 @@ create_stream(Config, Name) ->
create,
[<<"/">>, Name, [], <<"guest">>]).
delete_stream(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
delete,
[<<"/">>, Name, <<"guest">>]).
lookup_leader(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
lookup_leader,
[<<"/">>, Name]).
partitions(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,

View File

@ -27,11 +27,11 @@
<properties>
<stream-client.version>[0.5.0-SNAPSHOT,1.0-SNAPSHOT)</stream-client.version>
<junit.jupiter.version>5.8.1</junit.jupiter.version>
<junit.jupiter.version>5.8.2</junit.jupiter.version>
<assertj.version>3.21.0</assertj.version>
<okhttp.version>4.9.2</okhttp.version>
<okhttp.version>4.9.3</okhttp.version>
<gson.version>2.8.9</gson.version>
<logback.version>1.2.6</logback.version>
<logback.version>1.2.7</logback.version>
<maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
<spotless.version>2.2.0</spotless.version>

View File

@ -15,7 +15,9 @@ deps_dirs:
- bazel-bin/external/*
include_dirs:
- deps
- deps/*
- deps/*/include
- deps/*/src
- bazel-bin/external
- bazel-bin/external/*/include
plt_path: bazel-bin/deps/rabbit/.base_plt.plt