Notify user who performed the action in all system events

This commit is contained in:
Diana Corbacho 2017-01-15 20:23:27 +00:00
parent 7b1f550c17
commit 7e3aae8c1a
16 changed files with 275 additions and 197 deletions

View File

@ -821,17 +821,20 @@ insert_default_data() ->
DefaultWritePermBin = rabbit_data_coercion:to_binary(DefaultWritePerm), DefaultWritePermBin = rabbit_data_coercion:to_binary(DefaultWritePerm),
DefaultReadPermBin = rabbit_data_coercion:to_binary(DefaultReadPerm), DefaultReadPermBin = rabbit_data_coercion:to_binary(DefaultReadPerm),
ok = rabbit_vhost:add(DefaultVHostBin), ok = rabbit_vhost:add(DefaultVHostBin, ?INTERNAL_USER),
ok = rabbit_auth_backend_internal:add_user( ok = rabbit_auth_backend_internal:add_user(
DefaultUserBin, DefaultUserBin,
DefaultPassBin DefaultPassBin,
?INTERNAL_USER
), ),
ok = rabbit_auth_backend_internal:set_tags(DefaultUserBin,DefaultTags), ok = rabbit_auth_backend_internal:set_tags(DefaultUserBin, DefaultTags,
?INTERNAL_USER),
ok = rabbit_auth_backend_internal:set_permissions(DefaultUserBin, ok = rabbit_auth_backend_internal:set_permissions(DefaultUserBin,
DefaultVHostBin, DefaultVHostBin,
DefaultConfigurePermBin, DefaultConfigurePermBin,
DefaultWritePermBin, DefaultWritePermBin,
DefaultReadPermBin), DefaultReadPermBin,
?INTERNAL_USER),
ok. ok.
%%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------

View File

@ -131,7 +131,8 @@
auto_delete, auto_delete,
arguments, arguments,
owner_pid, owner_pid,
exclusive exclusive,
user_who_performed_action
]). ]).
-define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]]). -define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]]).
@ -286,7 +287,9 @@ terminate(_Reason, State = #q{q = Q}) ->
terminate_delete(EmitStats, Reason, terminate_delete(EmitStats, Reason,
State = #q{q = #amqqueue{name = QName}, State = #q{q = #amqqueue{name = QName},
backing_queue = BQ}) -> backing_queue = BQ,
status = Status}) ->
ActingUser = terminated_by(Status),
fun (BQS) -> fun (BQS) ->
BQS1 = BQ:delete_and_terminate(Reason, BQS), BQS1 = BQ:delete_and_terminate(Reason, BQS),
if EmitStats -> rabbit_event:if_enabled(State, #q.stats_timer, if EmitStats -> rabbit_event:if_enabled(State, #q.stats_timer,
@ -294,11 +297,17 @@ terminate_delete(EmitStats, Reason,
true -> ok true -> ok
end, end,
%% don't care if the internal delete doesn't return 'ok'. %% don't care if the internal delete doesn't return 'ok'.
rabbit_amqqueue:internal_delete(QName), rabbit_amqqueue:internal_delete(QName, ActingUser),
BQS1 BQS1
end. end.
terminate_shutdown(Fun, State) -> terminated_by({terminated_by, ActingUser}) ->
ActingUser;
terminated_by(_) ->
?INTERNAL_USER.
terminate_shutdown(Fun, #q{status = Status} = State) ->
ActingUser = terminated_by(Status),
State1 = #q{backing_queue_state = BQS, consumers = Consumers} = State1 = #q{backing_queue_state = BQS, consumers = Consumers} =
lists:foldl(fun (F, S) -> F(S) end, State, lists:foldl(fun (F, S) -> F(S) end, State,
[fun stop_sync_timer/1, [fun stop_sync_timer/1,
@ -310,7 +319,7 @@ terminate_shutdown(Fun, State) ->
_ -> ok = rabbit_memory_monitor:deregister(self()), _ -> ok = rabbit_memory_monitor:deregister(self()),
QName = qname(State), QName = qname(State),
notify_decorators(shutdown, State), notify_decorators(shutdown, State),
[emit_consumer_deleted(Ch, CTag, QName) || [emit_consumer_deleted(Ch, CTag, QName, ActingUser) ||
{Ch, CTag, _, _, _} <- {Ch, CTag, _, _, _} <-
rabbit_queue_consumers:all(Consumers)], rabbit_queue_consumers:all(Consumers)],
State1#q{backing_queue_state = Fun(BQS)} State1#q{backing_queue_state = Fun(BQS)}
@ -731,7 +740,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
{ok, State1}; {ok, State1};
{ChAckTags, ChCTags, Consumers1} -> {ChAckTags, ChCTags, Consumers1} ->
QName = qname(State1), QName = qname(State1),
[emit_consumer_deleted(DownPid, CTag, QName) || CTag <- ChCTags], [emit_consumer_deleted(DownPid, CTag, QName, ?INTERNAL_USER) || CTag <- ChCTags],
Holder1 = case Holder of Holder1 = case Holder of
{DownPid, _} -> none; {DownPid, _} -> none;
Other -> Other Other -> Other
@ -953,6 +962,8 @@ i(garbage_collection, _State) ->
i(reductions, _State) -> i(reductions, _State) ->
{reductions, Reductions} = erlang:process_info(self(), reductions), {reductions, Reductions} = erlang:process_info(self(), reductions),
Reductions; Reductions;
i(user_who_performed_action, #q{q = #amqqueue{options = Opts}}) ->
maps:get(user, Opts);
i(Item, #q{backing_queue_state = BQS, backing_queue = BQ}) -> i(Item, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:info(Item, BQS). BQ:info(Item, BQS).
@ -970,7 +981,7 @@ emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ All). rabbit_event:notify(queue_stats, Extra ++ All).
emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName,
PrefetchCount, Args, Ref) -> PrefetchCount, Args, Ref, ActingUser) ->
rabbit_event:notify(consumer_created, rabbit_event:notify(consumer_created,
[{consumer_tag, CTag}, [{consumer_tag, CTag},
{exclusive, Exclusive}, {exclusive, Exclusive},
@ -978,15 +989,17 @@ emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName,
{channel, ChPid}, {channel, ChPid},
{queue, QName}, {queue, QName},
{prefetch_count, PrefetchCount}, {prefetch_count, PrefetchCount},
{arguments, Args}], {arguments, Args},
{user_who_performed_action, ActingUser}],
Ref). Ref).
emit_consumer_deleted(ChPid, ConsumerTag, QName) -> emit_consumer_deleted(ChPid, ConsumerTag, QName, ActingUser) ->
rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName), rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),
rabbit_event:notify(consumer_deleted, rabbit_event:notify(consumer_deleted,
[{consumer_tag, ConsumerTag}, [{consumer_tag, ConsumerTag},
{channel, ChPid}, {channel, ChPid},
{queue, QName}]). {queue, QName},
{user_who_performed_action, ActingUser}]).
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
@ -996,7 +1009,7 @@ prioritise_call(Msg, _From, _Len, State) ->
{info, _Items} -> 9; {info, _Items} -> 9;
consumers -> 9; consumers -> 9;
stat -> 7; stat -> 7;
{basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State); {basic_consume, _, _, _, _, _, _, _, _, _, _} -> consumer_bias(State);
{basic_cancel, _, _, _} -> consumer_bias(State); {basic_cancel, _, _, _} -> consumer_bias(State);
_ -> 0 _ -> 0
end. end.
@ -1096,7 +1109,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
end; end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg}, PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser},
_From, State = #q{consumers = Consumers, _From, State = #q{consumers = Consumers,
exclusive_consumer = Holder}) -> exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of case check_exclusive_access(Holder, ExclusiveConsume, State) of
@ -1105,7 +1118,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ChPid, ConsumerTag, NoAck, ChPid, ConsumerTag, NoAck,
LimiterPid, LimiterActive, LimiterPid, LimiterActive,
PrefetchCount, Args, is_empty(State), PrefetchCount, Args, is_empty(State),
Consumers), ActingUser, Consumers),
ExclusiveConsumer = ExclusiveConsumer =
if ExclusiveConsume -> {ChPid, ConsumerTag}; if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> Holder true -> Holder
@ -1121,12 +1134,12 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
PrefetchCount, Args), PrefetchCount, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName, PrefetchCount, AckRequired, QName, PrefetchCount,
Args, none), Args, none, ActingUser),
notify_decorators(State1), notify_decorators(State1),
reply(ok, run_message_queue(State1)) reply(ok, run_message_queue(State1))
end; end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, _From,
State = #q{consumers = Consumers, State = #q{consumers = Consumers,
exclusive_consumer = Holder}) -> exclusive_consumer = Holder}) ->
ok = maybe_send_reply(ChPid, OkMsg), ok = maybe_send_reply(ChPid, OkMsg),
@ -1140,7 +1153,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end, end,
State1 = State#q{consumers = Consumers1, State1 = State#q{consumers = Consumers1,
exclusive_consumer = Holder1}, exclusive_consumer = Holder1},
emit_consumer_deleted(ChPid, ConsumerTag, qname(State1)), emit_consumer_deleted(ChPid, ConsumerTag, qname(State1), ActingUser),
notify_decorators(State1), notify_decorators(State1),
case should_auto_delete(State1) of case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1)); false -> reply(ok, ensure_expiry_timer(State1));
@ -1159,14 +1172,15 @@ handle_call(stat, _From, State) ->
ensure_expiry_timer(State), ensure_expiry_timer(State),
reply({ok, BQ:len(BQS), rabbit_queue_consumers:count()}, State1); reply({ok, BQ:len(BQS), rabbit_queue_consumers:count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, _From, handle_call({delete, IfUnused, IfEmpty, ActingUser}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
IsEmpty = BQ:is_empty(BQS), IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State), IsUnused = is_unused(State),
if if
IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
IfUnused and not(IsUnused) -> reply({error, in_use}, State); IfUnused and not(IsUnused) -> reply({error, in_use}, State);
true -> stop({ok, BQ:len(BQS)}, State) true -> stop({ok, BQ:len(BQS)},
State#q{status = {terminated_by, ActingUser}})
end; end;
handle_call(purge, _From, State = #q{backing_queue = BQ, handle_call(purge, _From, State = #q{backing_queue = BQ,
@ -1321,14 +1335,16 @@ handle_cast({force_event_refresh, Ref},
QName = qname(State), QName = qname(State),
AllConsumers = rabbit_queue_consumers:all(Consumers), AllConsumers = rabbit_queue_consumers:all(Consumers),
case Exclusive of case Exclusive of
none -> [emit_consumer_created( none ->
Ch, CTag, false, AckRequired, QName, Prefetch, [emit_consumer_created(
Args, Ref) || Ch, CTag, false, AckRequired, QName, Prefetch,
{Ch, CTag, AckRequired, Prefetch, Args} Args, Ref, ActingUser) ||
<- AllConsumers]; {Ch, CTag, AckRequired, Prefetch, Args, ActingUser}
{Ch, CTag} -> [{Ch, CTag, AckRequired, Prefetch, Args}] = AllConsumers, <- AllConsumers];
emit_consumer_created( {Ch, CTag} ->
Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref) [{Ch, CTag, AckRequired, Prefetch, Args, ActingUser}] = AllConsumers,
emit_consumer_created(
Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref, ActingUser)
end, end,
noreply(rabbit_event:init_stats_timer(State, #q.stats_timer)); noreply(rabbit_event:init_stats_timer(State, #q.stats_timer));

View File

@ -17,11 +17,11 @@
-module(rabbit_binding). -module(rabbit_binding).
-include("rabbit.hrl"). -include("rabbit.hrl").
-export([recover/2, exists/1, add/1, add/2, remove/1, remove/2, list/1]). -export([recover/2, exists/1, add/2, add/3, remove/1, remove/3, list/1]).
-export([list_for_source/1, list_for_destination/1, -export([list_for_source/1, list_for_destination/1,
list_for_source_and_destination/2]). list_for_source_and_destination/2]).
-export([new_deletions/0, combine_deletions/2, add_deletion/3, -export([new_deletions/0, combine_deletions/2, add_deletion/3,
process_deletions/1]). process_deletions/2]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4]).
%% these must all be run inside a mnesia tx %% these must all be run inside a mnesia tx
-export([has_for_source/1, remove_for_source/1, -export([has_for_source/1, remove_for_source/1,
@ -57,10 +57,10 @@
-spec recover([rabbit_exchange:name()], [rabbit_amqqueue:name()]) -> -spec recover([rabbit_exchange:name()], [rabbit_amqqueue:name()]) ->
'ok'. 'ok'.
-spec exists(rabbit_types:binding()) -> boolean() | bind_errors(). -spec exists(rabbit_types:binding()) -> boolean() | bind_errors().
-spec add(rabbit_types:binding()) -> bind_res(). -spec add(rabbit_types:binding(), rabbit_types:username()) -> bind_res().
-spec add(rabbit_types:binding(), inner_fun()) -> bind_res(). -spec add(rabbit_types:binding(), inner_fun(), rabbit_types:username()) -> bind_res().
-spec remove(rabbit_types:binding()) -> bind_res(). -spec remove(rabbit_types:binding()) -> bind_res().
-spec remove(rabbit_types:binding(), inner_fun()) -> bind_res(). -spec remove(rabbit_types:binding(), inner_fun(), rabbit_types:username()) -> bind_res().
-spec list(rabbit_types:vhost()) -> bindings(). -spec list(rabbit_types:vhost()) -> bindings().
-spec list_for_source -spec list_for_source
(rabbit_types:binding_source()) -> bindings(). (rabbit_types:binding_source()) -> bindings().
@ -84,7 +84,7 @@
(rabbit_types:binding_destination(), boolean()) -> deletions(). (rabbit_types:binding_destination(), boolean()) -> deletions().
-spec remove_transient_for_destination -spec remove_transient_for_destination
(rabbit_types:binding_destination()) -> deletions(). (rabbit_types:binding_destination()) -> deletions().
-spec process_deletions(deletions()) -> rabbit_misc:thunk('ok'). -spec process_deletions(deletions(), rabbit_types:username()) -> rabbit_misc:thunk('ok').
-spec combine_deletions(deletions(), deletions()) -> deletions(). -spec combine_deletions(deletions(), deletions()) -> deletions().
-spec add_deletion -spec add_deletion
(rabbit_exchange:name(), (rabbit_exchange:name(),
@ -158,9 +158,9 @@ exists(Binding) ->
rabbit_misc:const(mnesia:read({rabbit_route, B}) /= []) rabbit_misc:const(mnesia:read({rabbit_route, B}) /= [])
end, fun not_found_or_absent_errs/1). end, fun not_found_or_absent_errs/1).
add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end). add(Binding, ActingUser) -> add(Binding, fun (_Src, _Dst) -> ok end, ActingUser).
add(Binding, InnerFun) -> add(Binding, InnerFun, ActingUser) ->
binding_action( binding_action(
Binding, Binding,
fun (Src, Dst, B) -> fun (Src, Dst, B) ->
@ -172,7 +172,7 @@ add(Binding, InnerFun) ->
case InnerFun(Src, Dst) of case InnerFun(Src, Dst) of
ok -> ok ->
case mnesia:read({rabbit_route, B}) of case mnesia:read({rabbit_route, B}) of
[] -> add(Src, Dst, B); [] -> add(Src, Dst, B, ActingUser);
[_] -> fun () -> ok end [_] -> fun () -> ok end
end; end;
{error, _} = Err -> {error, _} = Err ->
@ -183,7 +183,7 @@ add(Binding, InnerFun) ->
end end
end, fun not_found_or_absent_errs/1). end, fun not_found_or_absent_errs/1).
add(Src, Dst, B) -> add(Src, Dst, B, ActingUser) ->
[SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]], [SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]],
case (SrcDurable andalso DstDurable andalso case (SrcDurable andalso DstDurable andalso
mnesia:read({rabbit_durable_route, B}) =/= []) of mnesia:read({rabbit_durable_route, B}) =/= []) of
@ -193,14 +193,16 @@ add(Src, Dst, B) ->
Serial = rabbit_exchange:serial(Src), Serial = rabbit_exchange:serial(Src),
fun () -> fun () ->
x_callback(Serial, Src, add_binding, B), x_callback(Serial, Src, add_binding, B),
ok = rabbit_event:notify(binding_created, info(B)) ok = rabbit_event:notify(
binding_created,
info(B) ++ [{user_who_performed_action, ActingUser}])
end; end;
true -> rabbit_misc:const({error, binding_not_found}) true -> rabbit_misc:const({error, binding_not_found})
end. end.
remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end). remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end, ?INTERNAL_USER).
remove(Binding, InnerFun) -> remove(Binding, InnerFun, ActingUser) ->
binding_action( binding_action(
Binding, Binding,
fun (Src, Dst, B) -> fun (Src, Dst, B) ->
@ -210,18 +212,18 @@ remove(Binding, InnerFun) ->
_ -> rabbit_misc:const({error, binding_not_found}) _ -> rabbit_misc:const({error, binding_not_found})
end; end;
_ -> case InnerFun(Src, Dst) of _ -> case InnerFun(Src, Dst) of
ok -> remove(Src, Dst, B); ok -> remove(Src, Dst, B, ActingUser);
{error, _} = Err -> rabbit_misc:const(Err) {error, _} = Err -> rabbit_misc:const(Err)
end end
end end
end, fun absent_errs_only/1). end, fun absent_errs_only/1).
remove(Src, Dst, B) -> remove(Src, Dst, B, ActingUser) ->
ok = sync_route(#route{binding = B}, durable(Src), durable(Dst), ok = sync_route(#route{binding = B}, durable(Src), durable(Dst),
fun mnesia:delete_object/3), fun mnesia:delete_object/3),
Deletions = maybe_auto_delete( Deletions = maybe_auto_delete(
B#binding.source, [B], new_deletions(), false), B#binding.source, [B], new_deletions(), false),
process_deletions(Deletions). process_deletions(Deletions, ActingUser).
list(VHostPath) -> list(VHostPath) ->
VHostResource = rabbit_misc:r(VHostPath, '_'), VHostResource = rabbit_misc:r(VHostPath, '_'),
@ -539,7 +541,7 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
anything_but(not_deleted, Deleted1, Deleted2), anything_but(not_deleted, Deleted1, Deleted2),
[Bindings1 | Bindings2]}. [Bindings1 | Bindings2]}.
process_deletions(Deletions) -> process_deletions(Deletions, ActingUser) ->
AugmentedDeletions = AugmentedDeletions =
dict:map(fun (_XName, {X, deleted, Bindings}) -> dict:map(fun (_XName, {X, deleted, Bindings}) ->
Bs = lists:flatten(Bindings), Bs = lists:flatten(Bindings),
@ -553,16 +555,21 @@ process_deletions(Deletions) ->
fun() -> fun() ->
dict:fold(fun (XName, {X, deleted, Bs, Serial}, ok) -> dict:fold(fun (XName, {X, deleted, Bs, Serial}, ok) ->
ok = rabbit_event:notify( ok = rabbit_event:notify(
exchange_deleted, [{name, XName}]), exchange_deleted,
del_notify(Bs), [{name, XName},
{user_who_performed_action, ActingUser}]),
del_notify(Bs, ActingUser),
x_callback(Serial, X, delete, Bs); x_callback(Serial, X, delete, Bs);
(_XName, {X, not_deleted, Bs, Serial}, ok) -> (_XName, {X, not_deleted, Bs, Serial}, ok) ->
del_notify(Bs), del_notify(Bs, ActingUser),
x_callback(Serial, X, remove_bindings, Bs) x_callback(Serial, X, remove_bindings, Bs)
end, ok, AugmentedDeletions) end, ok, AugmentedDeletions)
end. end.
del_notify(Bs) -> [rabbit_event:notify(binding_deleted, info(B)) || B <- Bs]. del_notify(Bs, ActingUser) -> [rabbit_event:notify(
binding_deleted,
info(B) ++ [{user_who_performed_action, ActingUser}])
|| B <- Bs].
x_callback(Serial, X, F, Bs) -> x_callback(Serial, X, F, Bs) ->
ok = rabbit_exchange:callback(X, F, Serial, [X, Bs]). ok = rabbit_exchange:callback(X, F, Serial, [X, Bs]).

View File

@ -323,6 +323,7 @@ tracked_connection_from_connection_state(#connection{
{node, node()}, {node, node()},
{vhost, VHost}, {vhost, VHost},
{user, Username}, {user, Username},
{user_who_performed_action, Username},
{connected_at, Ts}, {connected_at, Ts},
{pid, self()}, {pid, self()},
{type, network}, {type, network},

View File

@ -58,7 +58,7 @@ stop() ->
init([DefaultVHost]) -> init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare( #exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
topic, true, false, true, []), topic, true, false, true, [], ?INTERNAL_USER),
{ok, #resource{virtual_host = DefaultVHost, {ok, #resource{virtual_host = DefaultVHost,
kind = exchange, kind = exchange,
name = ?LOG_EXCH_NAME}}. name = ?LOG_EXCH_NAME}}.

View File

@ -18,12 +18,12 @@
-include("rabbit.hrl"). -include("rabbit.hrl").
-include("rabbit_framing.hrl"). -include("rabbit_framing.hrl").
-export([recover/0, policy_changed/2, callback/4, declare/6, -export([recover/0, policy_changed/2, callback/4, declare/7,
assert_equivalence/6, assert_args_equivalence/2, check_type/1, assert_equivalence/6, assert_args_equivalence/2, check_type/1,
lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2, lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2,
update_scratch/3, update_decorators/1, immutable/1, update_scratch/3, update_decorators/1, immutable/1,
info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4, info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4,
route/2, delete/2, validate_binding/2]). route/2, delete/3, validate_binding/2]).
%% these must be run inside a mnesia tx %% these must be run inside a mnesia tx
-export([maybe_auto_delete/2, serial/1, peek_serial/1, update/2]). -export([maybe_auto_delete/2, serial/1, peek_serial/1, update/2]).
@ -43,7 +43,7 @@
(rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'. (rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
-spec declare -spec declare
(name(), type(), boolean(), boolean(), boolean(), (name(), type(), boolean(), boolean(), boolean(),
rabbit_framing:amqp_table()) rabbit_framing:amqp_table(), rabbit_types:username())
-> rabbit_types:exchange(). -> rabbit_types:exchange().
-spec check_type -spec check_type
(binary()) -> atom() | rabbit_types:connection_exit(). (binary()) -> atom() | rabbit_types:connection_exit().
@ -86,8 +86,10 @@
-spec route(rabbit_types:exchange(), rabbit_types:delivery()) -spec route(rabbit_types:exchange(), rabbit_types:delivery())
-> [rabbit_amqqueue:name()]. -> [rabbit_amqqueue:name()].
-spec delete -spec delete
(name(), 'true') -> 'ok' | rabbit_types:error('not_found' | 'in_use'); (name(), 'true', rabbit_types:username()) ->
(name(), 'false') -> 'ok' | rabbit_types:error('not_found'). 'ok'| rabbit_types:error('not_found' | 'in_use');
(name(), 'false', rabbit_types:username()) ->
'ok' | rabbit_types:error('not_found').
-spec validate_binding -spec validate_binding
(rabbit_types:exchange(), rabbit_types:binding()) (rabbit_types:exchange(), rabbit_types:binding())
-> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}). -> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}).
@ -101,7 +103,7 @@
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments, -define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments,
policy]). policy, user_who_performed_action]).
recover() -> recover() ->
Xs = rabbit_misc:table_filter( Xs = rabbit_misc:table_filter(
@ -151,14 +153,15 @@ serial(#exchange{name = XName} = X) ->
(false) -> none (false) -> none
end. end.
declare(XName, Type, Durable, AutoDelete, Internal, Args) -> declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
X = rabbit_exchange_decorator:set( X = rabbit_exchange_decorator:set(
rabbit_policy:set(#exchange{name = XName, rabbit_policy:set(#exchange{name = XName,
type = Type, type = Type,
durable = Durable, durable = Durable,
auto_delete = AutoDelete, auto_delete = AutoDelete,
internal = Internal, internal = Internal,
arguments = Args})), arguments = Args,
options = #{user => Username}})),
XT = type_to_module(Type), XT = type_to_module(Type),
%% We want to upset things if it isn't ok %% We want to upset things if it isn't ok
ok = XT:validate(X), ok = XT:validate(X),
@ -342,6 +345,8 @@ i(policy, X) -> case rabbit_policy:name(X) of
none -> ''; none -> '';
Policy -> Policy Policy -> Policy
end; end;
i(user_who_performed_action, #exchange{options = Opts}) ->
maps:get(user, Opts);
i(Item, #exchange{type = Type} = X) -> i(Item, #exchange{type = Type} = X) ->
case (type_to_module(Type)):info(X, [Item]) of case (type_to_module(Type)):info(X, [Item]) of
[{Item, I}] -> I; [{Item, I}] -> I;
@ -437,7 +442,7 @@ call_with_exchange(XName, Fun) ->
end end
end). end).
delete(XName, IfUnused) -> delete(XName, IfUnused, Username) ->
Fun = case IfUnused of Fun = case IfUnused of
true -> fun conditional_delete/2; true -> fun conditional_delete/2;
false -> fun unconditional_delete/2 false -> fun unconditional_delete/2
@ -449,7 +454,7 @@ delete(XName, IfUnused) ->
%% see rabbitmq/rabbitmq-federation#7 %% see rabbitmq/rabbitmq-federation#7
rabbit_runtime_parameters:set(XName#resource.virtual_host, rabbit_runtime_parameters:set(XName#resource.virtual_host,
?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
XName#resource.name, true, none), XName#resource.name, true, Username),
call_with_exchange( call_with_exchange(
XName, XName,
fun (X) -> fun (X) ->
@ -457,7 +462,7 @@ delete(XName, IfUnused) ->
{deleted, X, Bs, Deletions} -> {deleted, X, Bs, Deletions} ->
rabbit_binding:process_deletions( rabbit_binding:process_deletions(
rabbit_binding:add_deletion( rabbit_binding:add_deletion(
XName, {X, deleted, Bs}, Deletions)); XName, {X, deleted, Bs}, Deletions), Username);
{error, _InUseOrNotFound} = E -> {error, _InUseOrNotFound} = E ->
rabbit_misc:const(E) rabbit_misc:const(E)
end end
@ -465,7 +470,7 @@ delete(XName, IfUnused) ->
after after
rabbit_runtime_parameters:clear(XName#resource.virtual_host, rabbit_runtime_parameters:clear(XName#resource.virtual_host,
?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
XName#resource.name) XName#resource.name, Username)
end. end.
validate_binding(X = #exchange{type = XType}, Binding) -> validate_binding(X = #exchange{type = XType}, Binding) ->

View File

@ -21,7 +21,7 @@
-include("rabbit.hrl"). -include("rabbit.hrl").
-export([register/0]). -export([register/0]).
-export([validate/5, notify/4, notify_clear/3]). -export([validate/5, notify/5, notify_clear/4]).
-import(rabbit_misc, [pget/2]). -import(rabbit_misc, [pget/2]).
@ -42,8 +42,8 @@ register() ->
validate(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name, _Term, _User) -> validate(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name, _Term, _User) ->
ok. ok.
notify(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name, _Term) -> notify(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name, _Term, _Username) ->
ok. ok.
notify_clear(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name) -> notify_clear(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name, _Username) ->
ok. ok.

View File

@ -43,10 +43,10 @@
-export([register/0]). -export([register/0]).
-export([invalidate/0, recover/0]). -export([invalidate/0, recover/0]).
-export([name/1, name_op/1, effective_definition/1, get/2, get_arg/3, set/1]). -export([name/1, name_op/1, effective_definition/1, get/2, get_arg/3, set/1]).
-export([validate/5, notify/4, notify_clear/3]). -export([validate/5, notify/5, notify_clear/4]).
-export([parse_set/6, set/6, delete/2, lookup/2, list/0, list/1, -export([parse_set/7, set/7, delete/3, lookup/2, list/0, list/1,
list_formatted/1, list_formatted/3, info_keys/0]). list_formatted/1, list_formatted/3, info_keys/0]).
-export([parse_set_op/6, set_op/6, delete_op/2, lookup_op/2, list_op/0, list_op/1, -export([parse_set_op/7, set_op/7, delete_op/3, lookup_op/2, list_op/0, list_op/1,
list_formatted_op/1, list_formatted_op/3]). list_formatted_op/1, list_formatted_op/3]).
-rabbit_boot_step({?MODULE, -rabbit_boot_step({?MODULE,
@ -198,38 +198,42 @@ invalid_file() ->
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
parse_set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo) -> parse_set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) ->
parse_set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo). parse_set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority,
ApplyTo, ActingUser).
parse_set(VHost, Name, Pattern, Definition, Priority, ApplyTo) -> parse_set(VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) ->
parse_set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo). parse_set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo,
ActingUser).
parse_set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo) -> parse_set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) ->
try rabbit_data_coercion:to_integer(Priority) of try rabbit_data_coercion:to_integer(Priority) of
Num -> parse_set0(Type, VHost, Name, Pattern, Definition, Num, ApplyTo) Num -> parse_set0(Type, VHost, Name, Pattern, Definition, Num, ApplyTo,
ActingUser)
catch catch
error:badarg -> {error, "~p priority must be a number", [Priority]} error:badarg -> {error, "~p priority must be a number", [Priority]}
end. end.
parse_set0(Type, VHost, Name, Pattern, Defn, Priority, ApplyTo) -> parse_set0(Type, VHost, Name, Pattern, Defn, Priority, ApplyTo, ActingUser) ->
case rabbit_json:try_decode(Defn) of case rabbit_json:try_decode(Defn) of
{ok, Term} -> {ok, Term} ->
set0(Type, VHost, Name, set0(Type, VHost, Name,
[{<<"pattern">>, Pattern}, [{<<"pattern">>, Pattern},
{<<"definition">>, maps:to_list(Term)}, {<<"definition">>, maps:to_list(Term)},
{<<"priority">>, Priority}, {<<"priority">>, Priority},
{<<"apply-to">>, ApplyTo}]); {<<"apply-to">>, ApplyTo}],
ActingUser);
error -> error ->
{error_string, "JSON decoding error"} {error_string, "JSON decoding error"}
end. end.
set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo) -> set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) ->
set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo). set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser).
set(VHost, Name, Pattern, Definition, Priority, ApplyTo) -> set(VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) ->
set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo). set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser).
set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo) -> set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) ->
PolicyProps = [{<<"pattern">>, Pattern}, PolicyProps = [{<<"pattern">>, Pattern},
{<<"definition">>, Definition}, {<<"definition">>, Definition},
{<<"priority">>, case Priority of {<<"priority">>, case Priority of
@ -240,16 +244,16 @@ set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
undefined -> <<"all">>; undefined -> <<"all">>;
_ -> ApplyTo _ -> ApplyTo
end}], end}],
set0(Type, VHost, Name, PolicyProps). set0(Type, VHost, Name, PolicyProps, ActingUser).
set0(Type, VHost, Name, Term) -> set0(Type, VHost, Name, Term, ActingUser) ->
rabbit_runtime_parameters:set_any(VHost, Type, Name, Term, none). rabbit_runtime_parameters:set_any(VHost, Type, Name, Term, ActingUser).
delete_op(VHost, Name) -> delete_op(VHost, Name, ActingUser) ->
rabbit_runtime_parameters:clear_any(VHost, <<"operator_policy">>, Name). rabbit_runtime_parameters:clear_any(VHost, <<"operator_policy">>, Name, ActingUser).
delete(VHost, Name) -> delete(VHost, Name, ActingUser) ->
rabbit_runtime_parameters:clear_any(VHost, <<"policy">>, Name). rabbit_runtime_parameters:clear_any(VHost, <<"policy">>, Name, ActingUser).
lookup_op(VHost, Name) -> lookup_op(VHost, Name) ->
case rabbit_runtime_parameters:lookup(VHost, <<"operator_policy">>, Name) of case rabbit_runtime_parameters:lookup(VHost, <<"operator_policy">>, Name) of
@ -322,18 +326,23 @@ validate(_VHost, <<"operator_policy">>, Name, Term, _User) ->
rabbit_parameter_validation:proplist( rabbit_parameter_validation:proplist(
Name, operator_policy_validation(), Term). Name, operator_policy_validation(), Term).
notify(VHost, <<"policy">>, Name, Term) -> notify(VHost, <<"policy">>, Name, Term, ActingUser) ->
rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost} | Term]), rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost},
{user_who_performed_action, ActingUser} | Term]),
update_policies(VHost); update_policies(VHost);
notify(VHost, <<"operator_policy">>, Name, Term) -> notify(VHost, <<"operator_policy">>, Name, Term, ActingUser) ->
rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost} | Term]), rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost},
{user_who_performed_action, ActingUser} | Term]),
update_policies(VHost). update_policies(VHost).
notify_clear(VHost, <<"policy">>, Name) -> notify_clear(VHost, <<"policy">>, Name, ActingUser) ->
rabbit_event:notify(policy_cleared, [{name, Name}, {vhost, VHost}]), rabbit_event:notify(policy_cleared, [{name, Name}, {vhost, VHost},
{user_who_performed_action, ActingUser}]),
update_policies(VHost); update_policies(VHost);
notify_clear(VHost, <<"operator_policy">>, Name) -> notify_clear(VHost, <<"operator_policy">>, Name, ActingUser) ->
rabbit_event:notify(operator_policy_cleared, [{name, Name}, {vhost, VHost}]), rabbit_event:notify(operator_policy_cleared,
[{name, Name}, {vhost, VHost},
{user_who_performed_action, ActingUser}]),
update_policies(VHost). update_policies(VHost).
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------

View File

@ -51,13 +51,13 @@
-include("rabbit.hrl"). -include("rabbit.hrl").
-export([parse_set/5, set/5, set_any/5, clear/3, clear_any/3, list/0, list/1, -export([parse_set/5, set/5, set_any/5, clear/4, clear_any/4, list/0, list/1,
list_component/1, list/2, list_formatted/1, list_formatted/3, list_component/1, list/2, list_formatted/1, list_formatted/3,
lookup/3, value/3, value/4, info_keys/0, clear_component/1]). lookup/3, value/3, value/4, info_keys/0, clear_component/1]).
-export([parse_set_global/2, set_global/2, value_global/1, value_global/2, -export([parse_set_global/3, set_global/3, value_global/1, value_global/2,
list_global/0, list_global_formatted/0, list_global_formatted/2, list_global/0, list_global_formatted/0, list_global_formatted/2,
lookup_global/1, global_info_keys/0, clear_global/1]). lookup_global/1, global_info_keys/0, clear_global/2]).
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
@ -65,15 +65,18 @@
-type ok_thunk_or_error_string() :: ok_or_error_string() | fun(() -> 'ok'). -type ok_thunk_or_error_string() :: ok_or_error_string() | fun(() -> 'ok').
-spec parse_set(rabbit_types:vhost(), binary(), binary(), string(), -spec parse_set(rabbit_types:vhost(), binary(), binary(), string(),
rabbit_types:user() | 'none') -> ok_or_error_string(). rabbit_types:user() | rabbit_types:username() | 'none')
-> ok_or_error_string().
-spec set(rabbit_types:vhost(), binary(), binary(), term(), -spec set(rabbit_types:vhost(), binary(), binary(), term(),
rabbit_types:user() | 'none') -> ok_or_error_string(). rabbit_types:user() | rabbit_types:username() | 'none')
-> ok_or_error_string().
-spec set_any(rabbit_types:vhost(), binary(), binary(), term(), -spec set_any(rabbit_types:vhost(), binary(), binary(), term(),
rabbit_types:user() | 'none') -> ok_or_error_string(). rabbit_types:user() | rabbit_types:username() | 'none')
-spec set_global(atom(), term()) -> 'ok'. -> ok_or_error_string().
-spec clear(rabbit_types:vhost(), binary(), binary()) -spec set_global(atom(), term(), rabbit_types:username()) -> 'ok'.
-> ok_thunk_or_error_string(). -spec clear(rabbit_types:vhost(), binary(), binary(), rabbit_types:username())
-spec clear_any(rabbit_types:vhost(), binary(), binary()) -> ok_thunk_or_error_string().
-spec clear_any(rabbit_types:vhost(), binary(), binary(), rabbit_types:username())
-> ok_thunk_or_error_string(). -> ok_thunk_or_error_string().
-spec list() -> [rabbit_types:infos()]. -spec list() -> [rabbit_types:infos()].
-spec list(rabbit_types:vhost() | '_') -> [rabbit_types:infos()]. -spec list(rabbit_types:vhost() | '_') -> [rabbit_types:infos()].
@ -113,19 +116,20 @@ set(_, <<"policy">>, _, _, _) ->
set(VHost, Component, Name, Term, User) -> set(VHost, Component, Name, Term, User) ->
set_any(VHost, Component, Name, Term, User). set_any(VHost, Component, Name, Term, User).
parse_set_global(Name, String) -> parse_set_global(Name, String, ActingUser) ->
Definition = rabbit_data_coercion:to_binary(String), Definition = rabbit_data_coercion:to_binary(String),
case rabbit_json:try_decode(Definition) of case rabbit_json:try_decode(Definition) of
{ok, Term} when is_map(Term) -> set_global(Name, maps:to_list(Term)); {ok, Term} when is_map(Term) -> set_global(Name, maps:to_list(Term), ActingUser);
{ok, Term} -> set_global(Name, Term); {ok, Term} -> set_global(Name, Term, ActingUser);
error -> {error_string, "JSON decoding error"} error -> {error_string, "JSON decoding error"}
end. end.
set_global(Name, Term) -> set_global(Name, Term, ActingUser) ->
NameAsAtom = rabbit_data_coercion:to_atom(Name), NameAsAtom = rabbit_data_coercion:to_atom(Name),
mnesia_update(NameAsAtom, Term), mnesia_update(NameAsAtom, Term),
event_notify(parameter_set, none, global, [{name, NameAsAtom}, event_notify(parameter_set, none, global, [{name, NameAsAtom},
{value, Term}]), {value, Term},
{user_who_performed_action, ActingUser}]),
ok. ok.
format_error(L) -> format_error(L) ->
@ -141,15 +145,19 @@ set_any0(VHost, Component, Name, Term, User) ->
case lookup_component(Component) of case lookup_component(Component) of
{ok, Mod} -> {ok, Mod} ->
case flatten_errors( case flatten_errors(
Mod:validate(VHost, Component, Name, Term, User)) of Mod:validate(VHost, Component, Name, Term, get_user(User))) of
ok -> ok ->
case mnesia_update(VHost, Component, Name, Term) of case mnesia_update(VHost, Component, Name, Term) of
{old, Term} -> ok; {old, Term} ->
_ -> event_notify( ok;
parameter_set, VHost, Component, _ ->
[{name, Name}, ActingUser = get_username(User),
{value, Term}]), event_notify(
Mod:notify(VHost, Component, Name, Term) parameter_set, VHost, Component,
[{name, Name},
{value, Term},
{user_who_performed_action, ActingUser}]),
Mod:notify(VHost, Component, Name, Term, ActingUser)
end, end,
ok; ok;
E -> E ->
@ -159,6 +167,19 @@ set_any0(VHost, Component, Name, Term, User) ->
E E
end. end.
%% Validate only an user record as expected by the API before #rabbitmq-event-exchange-10
get_user(#user{} = User) ->
User;
get_user(_) ->
none.
get_username(#user{username = Username}) ->
Username;
get_username(none) ->
?INTERNAL_USER;
get_username(Any) ->
Any.
mnesia_update(Key, Term) -> mnesia_update(Key, Term) ->
rabbit_misc:execute_mnesia_transaction(mnesia_update_fun(Key, Term)). rabbit_misc:execute_mnesia_transaction(mnesia_update_fun(Key, Term)).
@ -176,15 +197,17 @@ mnesia_update_fun(Key, Term) ->
Res Res
end. end.
clear(_, <<"policy">> , _) -> clear(_, <<"policy">> , _, _) ->
{error_string, "policies may not be cleared using this method"}; {error_string, "policies may not be cleared using this method"};
clear(VHost, Component, Name) -> clear(VHost, Component, Name, ActingUser) ->
clear_any(VHost, Component, Name). clear_any(VHost, Component, Name, ActingUser).
clear_global(Key) -> clear_global(Key, ActingUser) ->
KeyAsAtom = rabbit_data_coercion:to_atom(Key), KeyAsAtom = rabbit_data_coercion:to_atom(Key),
Notify = fun() -> Notify = fun() ->
event_notify(parameter_set, none, global, [{name, KeyAsAtom}]), event_notify(parameter_set, none, global,
[{name, KeyAsAtom},
{user_who_performed_action, ActingUser}]),
ok ok
end, end,
case value_global(KeyAsAtom) of case value_global(KeyAsAtom) of
@ -212,13 +235,14 @@ clear_component(Component) ->
ok ok
end. end.
clear_any(VHost, Component, Name) -> clear_any(VHost, Component, Name, ActingUser) ->
Notify = fun () -> Notify = fun () ->
case lookup_component(Component) of case lookup_component(Component) of
{ok, Mod} -> event_notify( {ok, Mod} -> event_notify(
parameter_cleared, VHost, Component, parameter_cleared, VHost, Component,
[{name, Name}]), [{name, Name},
Mod:notify_clear(VHost, Component, Name); {user_who_performed_action, ActingUser}]),
Mod:notify_clear(VHost, Component, Name, ActingUser);
_ -> ok _ -> ok
end end
end, end,

View File

@ -20,14 +20,14 @@
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
-export([add/1, delete/1, exists/1, list/0, with/2, assert/1, update/2, -export([add/2, delete/2, exists/1, list/0, with/2, assert/1, update/2,
set_limits/2, limits_of/1]). set_limits/2, limits_of/1]).
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]). -export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]). -export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
-export([purge_messages/1]). -export([purge_messages/1]).
-spec add(rabbit_types:vhost()) -> 'ok'. -spec add(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'.
-spec delete(rabbit_types:vhost()) -> 'ok'. -spec delete(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'.
-spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A. -spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A.
-spec exists(rabbit_types:vhost()) -> boolean(). -spec exists(rabbit_types:vhost()) -> boolean().
-spec list() -> [rabbit_types:vhost()]. -spec list() -> [rabbit_types:vhost()].
@ -46,7 +46,7 @@
-define(INFO_KEYS, [name, tracing]). -define(INFO_KEYS, [name, tracing]).
add(VHostPath) -> add(VHostPath, ActingUser) ->
rabbit_log:info("Adding vhost '~s'~n", [VHostPath]), rabbit_log:info("Adding vhost '~s'~n", [VHostPath]),
R = rabbit_misc:execute_mnesia_transaction( R = rabbit_misc:execute_mnesia_transaction(
fun () -> fun () ->
@ -62,7 +62,7 @@ add(VHostPath) ->
(ok, false) -> (ok, false) ->
[rabbit_exchange:declare( [rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name), rabbit_misc:r(VHostPath, exchange, Name),
Type, true, false, Internal, []) || Type, true, false, Internal, [], ActingUser) ||
{Name, Type, Internal} <- {Name, Type, Internal} <-
[{<<"">>, direct, false}, [{<<"">>, direct, false},
{<<"amq.direct">>, direct, false}, {<<"amq.direct">>, direct, false},
@ -75,24 +75,26 @@ add(VHostPath) ->
{<<"amq.rabbitmq.trace">>, topic, true}]], {<<"amq.rabbitmq.trace">>, topic, true}]],
ok ok
end), end),
rabbit_event:notify(vhost_created, info(VHostPath)), rabbit_event:notify(vhost_created, info(VHostPath)
++ [{user_who_performed_action, ActingUser}]),
R. R.
delete(VHostPath) -> delete(VHostPath, ActingUser) ->
%% FIXME: We are forced to delete the queues and exchanges outside %% FIXME: We are forced to delete the queues and exchanges outside
%% the TX below. Queue deletion involves sending messages to the queue %% the TX below. Queue deletion involves sending messages to the queue
%% process, which in turn results in further mnesia actions and %% process, which in turn results in further mnesia actions and
%% eventually the termination of that process. Exchange deletion causes %% eventually the termination of that process. Exchange deletion causes
%% notifications which must be sent outside the TX %% notifications which must be sent outside the TX
rabbit_log:info("Deleting vhost '~s'~n", [VHostPath]), rabbit_log:info("Deleting vhost '~s'~n", [VHostPath]),
QDelFun = fun (Q) -> rabbit_amqqueue:delete(Q, false, false) end, QDelFun = fun (Q) -> rabbit_amqqueue:delete(Q, false, false, ActingUser) end,
[assert_benign(rabbit_amqqueue:with(Name, QDelFun)) || [assert_benign(rabbit_amqqueue:with(Name, QDelFun)) ||
#amqqueue{name = Name} <- rabbit_amqqueue:list(VHostPath)], #amqqueue{name = Name} <- rabbit_amqqueue:list(VHostPath)],
[assert_benign(rabbit_exchange:delete(Name, false)) || [assert_benign(rabbit_exchange:delete(Name, false, ActingUser)) ||
#exchange{name = Name} <- rabbit_exchange:list(VHostPath)], #exchange{name = Name} <- rabbit_exchange:list(VHostPath)],
Funs = rabbit_misc:execute_mnesia_transaction( Funs = rabbit_misc:execute_mnesia_transaction(
with(VHostPath, fun () -> internal_delete(VHostPath) end)), with(VHostPath, fun () -> internal_delete(VHostPath, ActingUser) end)),
ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]), ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath},
{user_who_performed_action, ActingUser}]),
[ok = Fun() || Fun <- Funs], [ok = Fun() || Fun <- Funs],
ok. ok.
@ -117,18 +119,19 @@ assert_benign({error, {absent, Q, _}}) ->
{error, not_found} -> ok {error, not_found} -> ok
end. end.
internal_delete(VHostPath) -> internal_delete(VHostPath, ActingUser) ->
[ok = rabbit_auth_backend_internal:clear_permissions( [ok = rabbit_auth_backend_internal:clear_permissions(
proplists:get_value(user, Info), VHostPath) proplists:get_value(user, Info), VHostPath, ActingUser)
|| Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)], || Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)],
TopicPermissions = rabbit_auth_backend_internal:list_vhost_topic_permissions(VHostPath), TopicPermissions = rabbit_auth_backend_internal:list_vhost_topic_permissions(VHostPath),
[ok = rabbit_auth_backend_internal:clear_topic_permissions( [ok = rabbit_auth_backend_internal:clear_topic_permissions(
proplists:get_value(user, TopicPermission), VHostPath) || TopicPermission <- TopicPermissions], proplists:get_value(user, TopicPermission), VHostPath) || TopicPermission <- TopicPermissions],
Fs1 = [rabbit_runtime_parameters:clear(VHostPath, Fs1 = [rabbit_runtime_parameters:clear(VHostPath,
proplists:get_value(component, Info), proplists:get_value(component, Info),
proplists:get_value(name, Info)) proplists:get_value(name, Info),
ActingUser)
|| Info <- rabbit_runtime_parameters:list(VHostPath)], || Info <- rabbit_runtime_parameters:list(VHostPath)],
Fs2 = [rabbit_policy:delete(VHostPath, proplists:get_value(name, Info)) Fs2 = [rabbit_policy:delete(VHostPath, proplists:get_value(name, Info), ActingUser)
|| Info <- rabbit_policy:list(VHostPath)], || Info <- rabbit_policy:list(VHostPath)],
ok = mnesia:delete({rabbit_vhost, VHostPath}), ok = mnesia:delete({rabbit_vhost, VHostPath}),
purge_messages(VHostPath), purge_messages(VHostPath),

View File

@ -21,10 +21,10 @@
-include("rabbit.hrl"). -include("rabbit.hrl").
-export([register/0]). -export([register/0]).
-export([parse_set/2, set/2, clear/1]). -export([parse_set/3, set/3, clear/2]).
-export([list/0, list/1]). -export([list/0, list/1]).
-export([update_limit/3, clear_limit/2, get_limit/2]). -export([update_limit/4, clear_limit/3, get_limit/2]).
-export([validate/5, notify/4, notify_clear/3]). -export([validate/5, notify/5, notify_clear/4]).
-export([connection_limit/1, queue_limit/1, -export([connection_limit/1, queue_limit/1,
is_over_queue_limit/1, is_over_connection_limit/1]). is_over_queue_limit/1, is_over_connection_limit/1]).
@ -45,12 +45,15 @@ validate(_VHost, <<"vhost-limits">>, Name, Term, _User) ->
rabbit_parameter_validation:proplist( rabbit_parameter_validation:proplist(
Name, vhost_limit_validation(), Term). Name, vhost_limit_validation(), Term).
notify(VHost, <<"vhost-limits">>, <<"limits">>, Limits) -> notify(VHost, <<"vhost-limits">>, <<"limits">>, Limits, ActingUser) ->
rabbit_event:notify(vhost_limits_set, [{name, <<"limits">>} | Limits]), rabbit_event:notify(vhost_limits_set, [{name, <<"limits">>},
{user_who_performed_action, ActingUser}
| Limits]),
update_vhost(VHost, Limits). update_vhost(VHost, Limits).
notify_clear(VHost, <<"vhost-limits">>, <<"limits">>) -> notify_clear(VHost, <<"vhost-limits">>, <<"limits">>, ActingUser) ->
rabbit_event:notify(vhost_limits_cleared, [{name, <<"limits">>}]), rabbit_event:notify(vhost_limits_cleared, [{name, <<"limits">>},
{user_who_performed_action, ActingUser}]),
update_vhost(VHost, undefined). update_vhost(VHost, undefined).
connection_limit(VirtualHost) -> connection_limit(VirtualHost) ->
@ -128,38 +131,38 @@ is_over_queue_limit(VirtualHost) ->
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
parse_set(VHost, Defn) -> parse_set(VHost, Defn, ActingUser) ->
Definition = rabbit_data_coercion:to_binary(Defn), Definition = rabbit_data_coercion:to_binary(Defn),
case rabbit_json:try_decode(Definition) of case rabbit_json:try_decode(Definition) of
{ok, Term} -> {ok, Term} ->
set(VHost, maps:to_list(Term)); set(VHost, maps:to_list(Term), ActingUser);
error -> error ->
{error_string, "JSON decoding error"} {error_string, "JSON decoding error"}
end. end.
set(VHost, Defn) -> set(VHost, Defn, ActingUser) ->
rabbit_runtime_parameters:set_any(VHost, <<"vhost-limits">>, rabbit_runtime_parameters:set_any(VHost, <<"vhost-limits">>,
<<"limits">>, Defn, none). <<"limits">>, Defn, ActingUser).
clear(VHost) -> clear(VHost, ActingUser) ->
rabbit_runtime_parameters:clear_any(VHost, <<"vhost-limits">>, rabbit_runtime_parameters:clear_any(VHost, <<"vhost-limits">>,
<<"limits">>). <<"limits">>, ActingUser).
update_limit(VHost, Name, Value) -> update_limit(VHost, Name, Value, ActingUser) ->
OldDef = case rabbit_runtime_parameters:list(VHost, <<"vhost-limits">>) of OldDef = case rabbit_runtime_parameters:list(VHost, <<"vhost-limits">>) of
[] -> []; [] -> [];
[Param] -> pget(value, Param, []) [Param] -> pget(value, Param, [])
end, end,
NewDef = [{Name, Value} | lists:keydelete(Name, 1, OldDef)], NewDef = [{Name, Value} | lists:keydelete(Name, 1, OldDef)],
set(VHost, NewDef). set(VHost, NewDef, ActingUser).
clear_limit(VHost, Name) -> clear_limit(VHost, Name, ActingUser) ->
OldDef = case rabbit_runtime_parameters:list(VHost, <<"vhost-limits">>) of OldDef = case rabbit_runtime_parameters:list(VHost, <<"vhost-limits">>) of
[] -> []; [] -> [];
[Param] -> pget(value, Param, []) [Param] -> pget(value, Param, [])
end, end,
NewDef = lists:keydelete(Name, 1, OldDef), NewDef = lists:keydelete(Name, 1, OldDef),
set(VHost, NewDef). set(VHost, NewDef, ActingUser).
vhost_limit_validation() -> vhost_limit_validation() ->
[{<<"max-connections">>, fun rabbit_parameter_validation:integer/2, optional}, [{<<"max-connections">>, fun rabbit_parameter_validation:integer/2, optional},

View File

@ -20,7 +20,7 @@
-include("rabbit.hrl"). -include("rabbit.hrl").
-export([validate/5, notify/4, notify_clear/3]). -export([validate/5, notify/5, notify_clear/4]).
-export([register/0, unregister/0]). -export([register/0, unregister/0]).
-export([validate_policy/1]). -export([validate_policy/1]).
-export([register_policy_validator/0, unregister_policy_validator/0]). -export([register_policy_validator/0, unregister_policy_validator/0]).
@ -43,8 +43,8 @@ validate(_, <<"test">>, <<"admin">>, _Term, User) ->
end; end;
validate(_, <<"test">>, _, _, _) -> {error, "meh", []}. validate(_, <<"test">>, _, _, _) -> {error, "meh", []}.
notify(_, _, _, _) -> ok. notify(_, _, _, _, _) -> ok.
notify_clear(_, _, _) -> ok. notify_clear(_, _, _, _) -> ok.
%---------------------------------------------------------------------------- %----------------------------------------------------------------------------

View File

@ -223,7 +223,7 @@ vhost_deletion(Config) ->
rabbit_ct_broker_helpers:set_ha_policy_all(Config), rabbit_ct_broker_helpers:set_ha_policy_all(Config),
ACh = rabbit_ct_client_helpers:open_channel(Config, A), ACh = rabbit_ct_client_helpers:open_channel(Config, A),
amqp_channel:call(ACh, #'queue.declare'{queue = <<"vhost_deletion-q">>}), amqp_channel:call(ACh, #'queue.declare'{queue = <<"vhost_deletion-q">>}),
ok = rpc:call(A, rabbit_vhost, delete, [<<"/">>]), ok = rpc:call(A, rabbit_vhost, delete, [<<"/">>, <<"acting-user">>]),
ok. ok.
promote_on_shutdown(Config) -> promote_on_shutdown(Config) ->

View File

@ -245,7 +245,8 @@ unset_location_config(Config) ->
declare(Config, QueueName, Durable, AutoDelete, Args, Owner) -> declare(Config, QueueName, Durable, AutoDelete, Args, Owner) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
{new, Queue} = rpc:call(Node, rabbit_amqqueue, declare, {new, Queue} = rpc:call(Node, rabbit_amqqueue, declare,
[QueueName, Durable, AutoDelete, Args, Owner]), [QueueName, Durable, AutoDelete, Args, Owner,
<<"acting-user">>]),
Queue. Queue.
verify_min_master(Config, Q) -> verify_min_master(Config, Q) ->

View File

@ -70,13 +70,13 @@ topic_permission_database_access(Config) ->
topic_permission_database_access1(_Config) -> topic_permission_database_access1(_Config) ->
0 = length(ets:tab2list(rabbit_topic_permission)), 0 = length(ets:tab2list(rabbit_topic_permission)),
rabbit_vhost:add(<<"/">>), rabbit_vhost:add(<<"/">>, <<"acting-user">>),
rabbit_vhost:add(<<"other-vhost">>), rabbit_vhost:add(<<"other-vhost">>, <<"acting-user">>),
rabbit_auth_backend_internal:add_user(<<"guest">>, <<"guest">>), rabbit_auth_backend_internal:add_user(<<"guest">>, <<"guest">>, <<"acting-user">>),
rabbit_auth_backend_internal:add_user(<<"dummy">>, <<"dummy">>), rabbit_auth_backend_internal:add_user(<<"dummy">>, <<"dummy">>, <<"acting-user">>),
rabbit_auth_backend_internal:set_topic_permissions( rabbit_auth_backend_internal:set_topic_permissions(
<<"guest">>, <<"/">>, <<"amq.topic">>, "^a" <<"guest">>, <<"/">>, <<"amq.topic">>, "^a", <<"acting-user">>
), ),
1 = length(ets:tab2list(rabbit_topic_permission)), 1 = length(ets:tab2list(rabbit_topic_permission)),
1 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)), 1 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)),
@ -88,7 +88,7 @@ topic_permission_database_access1(_Config) ->
1 = length(rabbit_auth_backend_internal:list_topic_permissions()), 1 = length(rabbit_auth_backend_internal:list_topic_permissions()),
rabbit_auth_backend_internal:set_topic_permissions( rabbit_auth_backend_internal:set_topic_permissions(
<<"guest">>, <<"other-vhost">>, <<"amq.topic">>, ".*" <<"guest">>, <<"other-vhost">>, <<"amq.topic">>, ".*", <<"acting-user">>
), ),
2 = length(ets:tab2list(rabbit_topic_permission)), 2 = length(ets:tab2list(rabbit_topic_permission)),
2 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)), 2 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)),
@ -100,10 +100,10 @@ topic_permission_database_access1(_Config) ->
2 = length(rabbit_auth_backend_internal:list_topic_permissions()), 2 = length(rabbit_auth_backend_internal:list_topic_permissions()),
rabbit_auth_backend_internal:set_topic_permissions( rabbit_auth_backend_internal:set_topic_permissions(
<<"guest">>, <<"/">>, <<"topic1">>, "^a" <<"guest">>, <<"/">>, <<"topic1">>, "^a", <<"acting-user">>
), ),
rabbit_auth_backend_internal:set_topic_permissions( rabbit_auth_backend_internal:set_topic_permissions(
<<"guest">>, <<"/">>, <<"topic2">>, "^a" <<"guest">>, <<"/">>, <<"topic2">>, "^a", <<"acting-user">>
), ),
4 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)), 4 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)),
@ -111,25 +111,29 @@ topic_permission_database_access1(_Config) ->
1 = length(rabbit_auth_backend_internal:list_user_vhost_topic_permissions(<<"guest">>,<<"other-vhost">>)), 1 = length(rabbit_auth_backend_internal:list_user_vhost_topic_permissions(<<"guest">>,<<"other-vhost">>)),
4 = length(rabbit_auth_backend_internal:list_topic_permissions()), 4 = length(rabbit_auth_backend_internal:list_topic_permissions()),
rabbit_auth_backend_internal:clear_topic_permissions(<<"guest">>, <<"other-vhost">>), rabbit_auth_backend_internal:clear_topic_permissions(<<"guest">>, <<"other-vhost">>,
<<"acting-user">>),
0 = length(rabbit_auth_backend_internal:list_vhost_topic_permissions(<<"other-vhost">>)), 0 = length(rabbit_auth_backend_internal:list_vhost_topic_permissions(<<"other-vhost">>)),
3 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)), 3 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)),
rabbit_auth_backend_internal:clear_topic_permissions(<<"guest">>, <<"/">>, <<"topic1">>), rabbit_auth_backend_internal:clear_topic_permissions(<<"guest">>, <<"/">>, <<"topic1">>,
<<"acting-user">>),
2 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)), 2 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)),
rabbit_auth_backend_internal:clear_topic_permissions(<<"guest">>, <<"/">>), rabbit_auth_backend_internal:clear_topic_permissions(<<"guest">>, <<"/">>,
<<"acting-user">>),
0 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)), 0 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)),
{error, {no_such_user, _}} = (catch rabbit_auth_backend_internal:set_topic_permissions( {error, {no_such_user, _}} = (catch rabbit_auth_backend_internal:set_topic_permissions(
<<"non-existing-user">>, <<"other-vhost">>, <<"amq.topic">>, ".*" <<"non-existing-user">>, <<"other-vhost">>, <<"amq.topic">>, ".*", <<"acting-user">>
)), )),
{error, {no_such_vhost, _}} = (catch rabbit_auth_backend_internal:set_topic_permissions( {error, {no_such_vhost, _}} = (catch rabbit_auth_backend_internal:set_topic_permissions(
<<"guest">>, <<"non-existing-vhost">>, <<"amq.topic">>, ".*" <<"guest">>, <<"non-existing-vhost">>, <<"amq.topic">>, ".*", <<"acting-user">>
)), )),
{error, {no_such_user, _}} = (catch rabbit_auth_backend_internal:set_topic_permissions( {error, {no_such_user, _}} = (catch rabbit_auth_backend_internal:set_topic_permissions(
<<"non-existing-user">>, <<"non-existing-vhost">>, <<"amq.topic">>, ".*" <<"non-existing-user">>, <<"non-existing-vhost">>, <<"amq.topic">>, ".*",
<<"acting-user">>
)), )),
{error, {no_such_user, _}} = (catch rabbit_auth_backend_internal:list_user_topic_permissions( {error, {no_such_user, _}} = (catch rabbit_auth_backend_internal:list_user_topic_permissions(
@ -141,7 +145,7 @@ topic_permission_database_access1(_Config) ->
)), )),
{error, {invalid_regexp, _, _}} = (catch rabbit_auth_backend_internal:set_topic_permissions( {error, {invalid_regexp, _, _}} = (catch rabbit_auth_backend_internal:set_topic_permissions(
<<"guest">>, <<"/">>, <<"amq.topic">>, "[" <<"guest">>, <<"/">>, <<"amq.topic">>, "[", <<"acting-user">>
)), )),
ok. ok.
@ -159,11 +163,11 @@ topic_permission_checks1(_Config) ->
#vhost{virtual_host = <<"other-vhost">>}, #vhost{virtual_host = <<"other-vhost">>},
write) write)
end), end),
rabbit_auth_backend_internal:add_user(<<"guest">>, <<"guest">>), rabbit_auth_backend_internal:add_user(<<"guest">>, <<"guest">>, <<"acting-user">>),
rabbit_auth_backend_internal:add_user(<<"dummy">>, <<"dummy">>), rabbit_auth_backend_internal:add_user(<<"dummy">>, <<"dummy">>, <<"acting-user">>),
rabbit_auth_backend_internal:set_topic_permissions( rabbit_auth_backend_internal:set_topic_permissions(
<<"guest">>, <<"/">>, <<"amq.topic">>, "^a" <<"guest">>, <<"/">>, <<"amq.topic">>, "^a", <<"acting-user">>
), ),
1 = length(ets:tab2list(rabbit_topic_permission)), 1 = length(ets:tab2list(rabbit_topic_permission)),
1 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)), 1 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)),
@ -172,7 +176,7 @@ topic_permission_checks1(_Config) ->
0 = length(rabbit_auth_backend_internal:list_vhost_topic_permissions(<<"other-vhost">>)), 0 = length(rabbit_auth_backend_internal:list_vhost_topic_permissions(<<"other-vhost">>)),
rabbit_auth_backend_internal:set_topic_permissions( rabbit_auth_backend_internal:set_topic_permissions(
<<"guest">>, <<"other-vhost">>, <<"amq.topic">>, ".*" <<"guest">>, <<"other-vhost">>, <<"amq.topic">>, ".*", <<"acting-user">>
), ),
2 = length(ets:tab2list(rabbit_topic_permission)), 2 = length(ets:tab2list(rabbit_topic_permission)),
2 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)), 2 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)),
@ -214,4 +218,4 @@ topic_permission_checks1(_Config) ->
write, write,
Context Context
), ),
ok. ok.

View File

@ -794,7 +794,7 @@ bq_variable_queue_delete_msg_store_files_callback1(Config) ->
rabbit_amqqueue:declare( rabbit_amqqueue:declare(
queue_name(Config, queue_name(Config,
<<"bq_variable_queue_delete_msg_store_files_callback-q">>), <<"bq_variable_queue_delete_msg_store_files_callback-q">>),
true, false, [], none), true, false, [], none, <<"acting-user">>),
Payload = <<0:8388608>>, %% 1MB Payload = <<0:8388608>>, %% 1MB
Count = 30, Count = 30,
publish_and_confirm(Q, Payload, Count), publish_and_confirm(Q, Payload, Count),
@ -811,7 +811,7 @@ bq_variable_queue_delete_msg_store_files_callback1(Config) ->
%% give the queue a second to receive the close_fds callback msg %% give the queue a second to receive the close_fds callback msg
timer:sleep(1000), timer:sleep(1000),
rabbit_amqqueue:delete(Q, false, false), rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>),
passed. passed.
bq_queue_recover(Config) -> bq_queue_recover(Config) ->
@ -822,7 +822,7 @@ bq_queue_recover1(Config) ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0), Count = 2 * rabbit_queue_index:next_segment_boundary(0),
{new, #amqqueue { pid = QPid, name = QName } = Q} = {new, #amqqueue { pid = QPid, name = QName } = Q} =
rabbit_amqqueue:declare(queue_name(Config, <<"bq_queue_recover-q">>), rabbit_amqqueue:declare(queue_name(Config, <<"bq_queue_recover-q">>),
true, false, [], none), true, false, [], none, <<"acting-user">>),
publish_and_confirm(Q, <<>>, Count), publish_and_confirm(Q, <<>>, Count),
SupPid = rabbit_ct_broker_helpers:get_queue_sup_pid(QPid), SupPid = rabbit_ct_broker_helpers:get_queue_sup_pid(QPid),
@ -848,7 +848,7 @@ bq_queue_recover1(Config) ->
rabbit_variable_queue:fetch(true, VQ1), rabbit_variable_queue:fetch(true, VQ1),
CountMinusOne = rabbit_variable_queue:len(VQ2), CountMinusOne = rabbit_variable_queue:len(VQ2),
_VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2), _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
ok = rabbit_amqqueue:internal_delete(QName) ok = rabbit_amqqueue:internal_delete(QName, <<"acting-user">>)
end), end),
passed. passed.
@ -2166,12 +2166,12 @@ change_password1(_Config) ->
UserName = <<"test_user">>, UserName = <<"test_user">>,
Password = <<"test_password">>, Password = <<"test_password">>,
case rabbit_auth_backend_internal:lookup_user(UserName) of case rabbit_auth_backend_internal:lookup_user(UserName) of
{ok, _} -> rabbit_auth_backend_internal:delete_user(UserName); {ok, _} -> rabbit_auth_backend_internal:delete_user(UserName, <<"acting-user">>);
_ -> ok _ -> ok
end, end,
ok = application:set_env(rabbit, password_hashing_module, ok = application:set_env(rabbit, password_hashing_module,
rabbit_password_hashing_md5), rabbit_password_hashing_md5),
ok = rabbit_auth_backend_internal:add_user(UserName, Password), ok = rabbit_auth_backend_internal:add_user(UserName, Password, <<"acting-user">>),
{ok, #auth_user{username = UserName}} = {ok, #auth_user{username = UserName}} =
rabbit_auth_backend_internal:user_login_authentication( rabbit_auth_backend_internal:user_login_authentication(
UserName, [{password, Password}]), UserName, [{password, Password}]),
@ -2182,7 +2182,8 @@ change_password1(_Config) ->
UserName, [{password, Password}]), UserName, [{password, Password}]),
NewPassword = <<"test_password1">>, NewPassword = <<"test_password1">>,
ok = rabbit_auth_backend_internal:change_password(UserName, NewPassword), ok = rabbit_auth_backend_internal:change_password(UserName, NewPassword,
<<"acting-user">>),
{ok, #auth_user{username = UserName}} = {ok, #auth_user{username = UserName}} =
rabbit_auth_backend_internal:user_login_authentication( rabbit_auth_backend_internal:user_login_authentication(
UserName, [{password, NewPassword}]), UserName, [{password, NewPassword}]),
@ -3004,14 +3005,14 @@ declare_on_dead_queue1(_Config, SecondaryNode) ->
fun () -> fun () ->
{new, #amqqueue{name = QueueName, pid = QPid}} = {new, #amqqueue{name = QueueName, pid = QPid}} =
rabbit_amqqueue:declare(QueueName, false, false, [], rabbit_amqqueue:declare(QueueName, false, false, [],
none), none, <<"acting-user">>),
exit(QPid, kill), exit(QPid, kill),
Self ! {self(), killed, QPid} Self ! {self(), killed, QPid}
end), end),
receive receive
{Pid, killed, OldPid} -> {Pid, killed, OldPid} ->
Q = dead_queue_loop(QueueName, OldPid), Q = dead_queue_loop(QueueName, OldPid),
{ok, 0} = rabbit_amqqueue:delete(Q, false, false), {ok, 0} = rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>),
passed passed
after ?TIMEOUT -> throw(failed_to_create_and_kill_queue) after ?TIMEOUT -> throw(failed_to_create_and_kill_queue)
end. end.
@ -3038,9 +3039,9 @@ refresh_events1(Config, SecondaryNode) ->
{new, #amqqueue{name = QName} = Q} = {new, #amqqueue{name = QName} = Q} =
rabbit_amqqueue:declare(queue_name(Config, <<"refresh_events-q">>), rabbit_amqqueue:declare(queue_name(Config, <<"refresh_events-q">>),
false, false, [], none), false, false, [], none, <<"acting-user">>),
expect_events(name, QName, queue_created), expect_events(name, QName, queue_created),
rabbit_amqqueue:delete(Q, false, false), rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>),
dummy_event_receiver:stop(), dummy_event_receiver:stop(),
passed. passed.
@ -3074,7 +3075,8 @@ must_exit(Fun) ->
end. end.
dead_queue_loop(QueueName, OldPid) -> dead_queue_loop(QueueName, OldPid) ->
{existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none), {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none,
<<"acting-user">>),
case Q#amqqueue.pid of case Q#amqqueue.pid of
OldPid -> timer:sleep(25), OldPid -> timer:sleep(25),
dead_queue_loop(QueueName, OldPid); dead_queue_loop(QueueName, OldPid);