amqp_client: Emit 'connection.blocked' in direct connections

Previously direct 0-9-1 connections did not notice when memory or disk
alarms were set. This could allow an 0-9-1 shovel where the destination
is a direct connection to completely overload a broker which is already
in alarm. With this change, direct connections register the connection
process with `rabbit_alarm` and emit `connection.blocked` and
`connection.unblocked` to the blocked handler if one is registered.
`rabbit_amqp091_shovel` already respects the `connection.blocked`, so
the destination will not receive any messages.
This commit is contained in:
Michael Davis 2025-10-01 16:40:34 -04:00
parent 24e2575bee
commit 9393ec9db5
No known key found for this signature in database
3 changed files with 59 additions and 6 deletions

View File

@ -31,6 +31,7 @@
server_properties,
%% connection.block, connection.unblock handler
block_handler,
blocked_by = sets:new([{version, 2}]),
closing = false %% #closing{} | false
}).
@ -199,9 +200,36 @@ handle_cast({server_misbehaved, AmqpError}, State) ->
server_misbehaved_close(AmqpError, State);
handle_cast({server_close, #'connection.close'{} = Close}, State) ->
server_initiated_close(Close, State);
handle_cast({register_blocked_handler, HandlerPid}, State) ->
handle_cast({register_blocked_handler, HandlerPid},
#state{blocked_by = BlockedBy} = State) ->
Ref = erlang:monitor(process, HandlerPid),
{noreply, State#state{block_handler = {HandlerPid, Ref}}}.
State1 = State#state{block_handler = {HandlerPid, Ref}},
%% If an alarm is already active, immediately block the handler.
_ = case sets:is_empty(BlockedBy) of
false ->
HandlerPid ! #'connection.blocked'{};
true ->
ok
end,
{noreply, State1};
handle_cast({conserve_resources, Source, Conserve},
#state{blocked_by = BlockedBy} = State) ->
WasNotBlocked = sets:is_empty(BlockedBy),
BlockedBy1 = case Conserve of
true ->
sets:add_element(Source, BlockedBy);
false ->
sets:del_element(Source, BlockedBy)
end,
State1 = State#state{blocked_by = BlockedBy1},
case sets:is_empty(BlockedBy1) of
true ->
handle_method(#'connection.unblocked'{}, State1);
false when WasNotBlocked ->
handle_method(#'connection.blocked'{}, State1);
false ->
{noreply, State1}
end.
%% @private
handle_info({'DOWN', _, process, BlockHandler, Reason},

View File

@ -13,7 +13,8 @@
-deprecated([{force_event_refresh, 1, eventually}]).
%% Internal
-export([list_local/0]).
-export([list_local/0,
conserve_resources/3]).
%% For testing only
-export([extract_extra_auth_props/4]).
@ -206,6 +207,8 @@ connect1(User = #user{username = Username}, VHost, Protocol, Pid, Infos) ->
ok -> ok = pg_local:join(rabbit_direct, Pid),
rabbit_core_metrics:connection_created(Pid, Infos),
rabbit_event:notify(connection_created, Infos),
_ = rabbit_alarm:register(
Pid, {?MODULE, conserve_resources, []}),
{ok, {User, rabbit_reader:server_properties(Protocol)}}
catch
exit:#amqp_error{name = Reason = not_allowed} ->
@ -252,3 +255,9 @@ disconnect(Pid, Infos) ->
pg_local:leave(rabbit_direct, Pid),
rabbit_core_metrics:connection_closed(Pid),
rabbit_event:notify(connection_closed, Infos).
-spec conserve_resources(pid(),
rabbit_alarm:resource_alarm_source(),
rabbit_alarm:resource_alert()) -> 'ok'.
conserve_resources(ChannelPid, Source, {_, Conserve, _}) ->
gen_server:cast(ChannelPid, {conserve_resources, Source, Conserve}).

View File

@ -16,12 +16,20 @@
all() ->
[
{group, all_tests}
{group, network_connection},
{group, direct_connection}
].
groups() ->
[
{all_tests, [], all_tests()}
{network_connection, [], [
dest_resource_alarm_on_confirm,
dest_resource_alarm_on_publish,
dest_resource_alarm_no_ack
]},
{direct_connection, [], [
dest_resource_alarm_on_confirm
]}
].
all_tests() ->
@ -51,13 +59,21 @@ end_per_suite(Config) ->
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_group(_, Config) ->
init_per_group(network_connection, Config) ->
rabbit_ct_helpers:set_config(
Config,
[{shovel_source_uri, shovel_test_utils:make_uri(Config, 1)},
{shovel_source_idx, 1},
{shovel_dest_uri, shovel_test_utils:make_uri(Config, 0)},
{shovel_dest_idx, 0}
]);
init_per_group(direct_connection, Config) ->
rabbit_ct_helpers:set_config(
Config,
[{shovel_source_uri, shovel_test_utils:make_uri(Config, 1)},
{shovel_source_idx, 1},
{shovel_dest_uri, <<"amqp://">>},
{shovel_dest_idx, 0}
]).
end_per_group(_, Config) ->