Block Web MQTT connection if memory or disk alarm

Previously (until RabbitMQ v3.11.x), a memory or disk alarm did
not block the Web MQTT connection because this feature was only
implemented half way through: The part that registers the Web MQTT
connection with rabbit_alarm was missing.
This commit is contained in:
David Ansari 2022-12-28 15:24:28 +00:00
parent a8b69b43c1
commit fb6c8da2fc
4 changed files with 66 additions and 17 deletions

View File

@ -74,8 +74,7 @@ init(Ref) ->
{ok, ConnStr} ->
ConnName = rabbit_data_coercion:to_binary(ConnStr),
rabbit_log_connection:debug("MQTT accepting TCP connection ~tp (~ts)", [self(), ConnName]),
rabbit_alarm:register(
self(), {?MODULE, conserve_resources, []}),
rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
LoginTimeout = application:get_env(?APP_NAME, login_timeout, 10_000),
erlang:send_after(LoginTimeout, self(), login_timeout),
ProcessorState = rabbit_mqtt_processor:initial_state(RealSocket, ConnName),

View File

@ -110,15 +110,15 @@ block(Config) ->
% %% Let it block
timer:sleep(100),
%% Blocked, but still will publish
%% Blocked, but still will publish when unblocked
puback_timeout = publish_qos1_timeout(C, <<"Topic1">>, <<"Now blocked">>, 1000),
puback_timeout = publish_qos1_timeout(C, <<"Topic1">>, <<"Still blocked">>, 1000),
%% Unblock
rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0.4]),
ok = expect_publishes(<<"Topic1">>, [<<"Not blocked yet">>,
<<"Now blocked">>,
<<"Still blocked">>]),
<<"Now blocked">>,
<<"Still blocked">>]),
ok = emqtt:disconnect(C).
block_connack_timeout(Config) ->

View File

@ -18,7 +18,9 @@
websocket_info/2,
terminate/3
]).
-export([close_connection/2]).
-export([conserve_resources/3,
close_connection/2]).
%% cowboy_sub_protocol
-export([upgrade/4,
@ -97,6 +99,7 @@ websocket_init({State0 = #state{socket = Sock}, PeerAddr}) ->
{ok, ConnStr} ->
ConnName = rabbit_data_coercion:to_binary(ConnStr),
rabbit_log_connection:info("Accepting Web MQTT connection ~s", [ConnName]),
rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
PState = rabbit_mqtt_processor:initial_state(
rabbit_net:unwrap_socket(Sock),
ConnName,
@ -118,6 +121,13 @@ close_connection(Pid, Reason) ->
sys:terminate(Pid, Reason),
ok.
-spec conserve_resources(pid(),
rabbit_alarm:resource_alarm_source(),
rabbit_alarm:resource_alert()) -> ok.
conserve_resources(Pid, _, {_, Conserve, _}) ->
Pid ! {conserve_resources, Conserve},
ok.
-spec websocket_handle(ping | pong | {text | binary | ping | pong, binary()}, State) ->
{cowboy_websocket:commands(), State} |
{cowboy_websocket:commands(), State, hibernate}.
@ -140,8 +150,7 @@ websocket_handle(Frame, State) ->
{cowboy_websocket:commands(), State} |
{cowboy_websocket:commands(), State, hibernate}.
websocket_info({conserve_resources, Conserve}, State) ->
NewState = State#state{conserve = Conserve},
handle_credits(NewState);
handle_credits(State#state{conserve = Conserve});
websocket_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
handle_credits(State);
@ -305,12 +314,12 @@ stop(State, CloseCode, Error0) ->
{[{close, CloseCode, Error}], State}.
handle_credits(State0) ->
case control_throttle(State0) of
State = #state{connection_state = running} ->
{[{active, true}], State, hibernate};
State ->
{[], State, hibernate}
end.
State = #state{connection_state = CS} = control_throttle(State0),
Active = case CS of
running -> true;
blocked -> false
end,
{[{active, Active}], State, hibernate}.
control_throttle(State = #state{connection_state = CS,
conserve = Conserve,

View File

@ -33,7 +33,7 @@ all() ->
groups() ->
[
{non_parallel_tests, [],
[connection
[block
, pubsub_shared_connection
, pubsub_separate_connections
, last_will_enabled_disconnect
@ -99,8 +99,30 @@ maybe_stop_inets(Testcase) ->
ok
end.
connection(Config) ->
C = ws_connect(?FUNCTION_NAME, Config),
%% -------------------------------------------------------------------
%% Testsuite cases
%% -------------------------------------------------------------------
block(Config) ->
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
C = ws_connect(ClientId, Config),
{ok, _, _} = emqtt:subscribe(C, Topic),
{ok, _} = emqtt:publish(C, Topic, <<"Not blocked yet">>, [{qos, 1}]),
ok = rpc(Config, 0, vm_memory_monitor, set_vm_memory_high_watermark, [0.00000001]),
%% Let it block
timer:sleep(100),
%% Blocked, but still will publish when unblocked
puback_timeout = publish_qos1_timeout(C, Topic, <<"Now blocked">>, 1000),
puback_timeout = publish_qos1_timeout(C, Topic, <<"Still blocked">>, 1000),
%% Unblock
rpc(Config, 0, vm_memory_monitor, set_vm_memory_high_watermark, [0.4]),
ok = expect_publishes(C, Topic, [<<"Not blocked yet">>,
<<"Now blocked">>,
<<"Still blocked">>]),
ok = emqtt:disconnect(C).
pubsub_shared_connection(Config) ->
@ -332,3 +354,22 @@ expect_publishes(ClientPid, Topic, [Payload|Rest]) ->
after 1000 ->
{publish_not_received, Payload}
end.
publish_qos1_timeout(Client, Topic, Payload, Timeout) ->
Mref = erlang:monitor(process, Client),
ok = emqtt:publish_async(Client, Topic, #{}, Payload, [{qos, 1}], infinity,
{fun ?MODULE:sync_publish_result/3, [self(), Mref]}),
receive
{Mref, Reply} ->
erlang:demonitor(Mref, [flush]),
Reply;
{'DOWN', Mref, process, Client, Reason} ->
ct:fail("client is down: ~tp", [Reason])
after
Timeout ->
erlang:demonitor(Mref, [flush]),
puback_timeout
end.
sync_publish_result(Caller, Mref, Result) ->
erlang:send(Caller, {Mref, Result}).