diff --git a/deps/amqp_client/src/amqp_gen_connection.erl b/deps/amqp_client/src/amqp_gen_connection.erl index 5681845779..886a06d45f 100644 --- a/deps/amqp_client/src/amqp_gen_connection.erl +++ b/deps/amqp_client/src/amqp_gen_connection.erl @@ -31,6 +31,7 @@ server_properties, %% connection.block, connection.unblock handler block_handler, + blocked_by = sets:new([{version, 2}]), closing = false %% #closing{} | false }). @@ -199,9 +200,36 @@ handle_cast({server_misbehaved, AmqpError}, State) -> server_misbehaved_close(AmqpError, State); handle_cast({server_close, #'connection.close'{} = Close}, State) -> server_initiated_close(Close, State); -handle_cast({register_blocked_handler, HandlerPid}, State) -> +handle_cast({register_blocked_handler, HandlerPid}, + #state{blocked_by = BlockedBy} = State) -> Ref = erlang:monitor(process, HandlerPid), - {noreply, State#state{block_handler = {HandlerPid, Ref}}}. + State1 = State#state{block_handler = {HandlerPid, Ref}}, + %% If an alarm is already active, immediately block the handler. + _ = case sets:is_empty(BlockedBy) of + false -> + HandlerPid ! #'connection.blocked'{}; + true -> + ok + end, + {noreply, State1}; +handle_cast({conserve_resources, Source, Conserve}, + #state{blocked_by = BlockedBy} = State) -> + WasNotBlocked = sets:is_empty(BlockedBy), + BlockedBy1 = case Conserve of + true -> + sets:add_element(Source, BlockedBy); + false -> + sets:del_element(Source, BlockedBy) + end, + State1 = State#state{blocked_by = BlockedBy1}, + case sets:is_empty(BlockedBy1) of + true -> + handle_method(#'connection.unblocked'{}, State1); + false when WasNotBlocked -> + handle_method(#'connection.blocked'{}, State1); + false -> + {noreply, State1} + end. %% @private handle_info({'DOWN', _, process, BlockHandler, Reason}, diff --git a/deps/rabbit/src/rabbit_direct.erl b/deps/rabbit/src/rabbit_direct.erl index 9f9a601bb2..62d34c4977 100644 --- a/deps/rabbit/src/rabbit_direct.erl +++ b/deps/rabbit/src/rabbit_direct.erl @@ -13,7 +13,8 @@ -deprecated([{force_event_refresh, 1, eventually}]). %% Internal --export([list_local/0]). +-export([list_local/0, + conserve_resources/3]). %% For testing only -export([extract_extra_auth_props/4]). @@ -206,6 +207,8 @@ connect1(User = #user{username = Username}, VHost, Protocol, Pid, Infos) -> ok -> ok = pg_local:join(rabbit_direct, Pid), rabbit_core_metrics:connection_created(Pid, Infos), rabbit_event:notify(connection_created, Infos), + _ = rabbit_alarm:register( + Pid, {?MODULE, conserve_resources, []}), {ok, {User, rabbit_reader:server_properties(Protocol)}} catch exit:#amqp_error{name = Reason = not_allowed} -> @@ -252,3 +255,9 @@ disconnect(Pid, Infos) -> pg_local:leave(rabbit_direct, Pid), rabbit_core_metrics:connection_closed(Pid), rabbit_event:notify(connection_closed, Infos). + +-spec conserve_resources(pid(), + rabbit_alarm:resource_alarm_source(), + rabbit_alarm:resource_alert()) -> 'ok'. +conserve_resources(ChannelPid, Source, {_, Conserve, _}) -> + gen_server:cast(ChannelPid, {conserve_resources, Source, Conserve}). diff --git a/deps/rabbitmq_shovel/test/amqp091_alarm_SUITE.erl b/deps/rabbitmq_shovel/test/amqp091_alarm_SUITE.erl index 00a3b9f7f4..8ca9ae238a 100644 --- a/deps/rabbitmq_shovel/test/amqp091_alarm_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp091_alarm_SUITE.erl @@ -16,12 +16,20 @@ all() -> [ - {group, all_tests} + {group, network_connection}, + {group, direct_connection} ]. groups() -> [ - {all_tests, [], all_tests()} + {network_connection, [], [ + dest_resource_alarm_on_confirm, + dest_resource_alarm_on_publish, + dest_resource_alarm_no_ack + ]}, + {direct_connection, [], [ + dest_resource_alarm_on_confirm + ]} ]. all_tests() -> @@ -51,13 +59,21 @@ end_per_suite(Config) -> rabbit_ct_client_helpers:teardown_steps() ++ rabbit_ct_broker_helpers:teardown_steps()). -init_per_group(_, Config) -> +init_per_group(network_connection, Config) -> rabbit_ct_helpers:set_config( Config, [{shovel_source_uri, shovel_test_utils:make_uri(Config, 1)}, {shovel_source_idx, 1}, {shovel_dest_uri, shovel_test_utils:make_uri(Config, 0)}, {shovel_dest_idx, 0} + ]); +init_per_group(direct_connection, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{shovel_source_uri, shovel_test_utils:make_uri(Config, 1)}, + {shovel_source_idx, 1}, + {shovel_dest_uri, <<"amqp://">>}, + {shovel_dest_idx, 0} ]). end_per_group(_, Config) ->